[jira] [Created] (KAFKA-6816) Tables in documentation are truncated

2018-04-23 Thread Hashan Gayasri Udugahapattuwa (JIRA)
Hashan Gayasri Udugahapattuwa created KAFKA-6816:


 Summary: Tables in documentation are truncated
 Key: KAFKA-6816
 URL: https://issues.apache.org/jira/browse/KAFKA-6816
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 1.1.0
 Environment: Firefox,Chrome (Fedora release 27)
Reporter: Hashan Gayasri Udugahapattuwa
 Attachments: Screenshot from 2018-04-23 12-17-11.png

Example page : 
[https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#id10]

[https://kafka.apache.org/css/styles.css] : 394

.right\{overflow:hidden}

causes data in tables to be truncated

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-04-23 Thread Odin Standal (JIRA)
Odin Standal created KAFKA-6817:
---

 Summary: UnknownProducerIdException when writing messages with old 
timestamps
 Key: KAFKA-6817
 URL: https://issues.apache.org/jira/browse/KAFKA-6817
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 1.1.0
Reporter: Odin Standal


We are seeing the following exception in our Kafka application:
{code:java}
ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer due 
to the following error: org.apache.kafka.streams.errors.StreamsException: task 
[0_0] Abort sending since an error caught with a previous record (key 22 
value some-value timestamp 1519200902670) to topic exactly-once-test-topic- v2 
due to This exception is raised by the broker if it could not locate the 
producer metadata associated with the producerId in question. This could happen 
if, for instance, the producer's records were deleted because their retention 
time had elapsed. Once the last records of the producerId are removed, the 
producer's metadata is removed from the broker, and future appends by the 
producer will return this exception. at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
 at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
 at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.errors.UnknownProducerIdException

{code}
We discovered this error when we had the need to reprocess old messages.

We have reproduced the error with a smaller example application. The error 
occurs after 10 minutes of producing messages that have old timestamps (type 1 
year old). The topic we are writing to have a retention.ms set to 1 year so we 
are expecting the messages to stay there.

After digging through the ProducerStateManager-code in the Kafka source code we 
have a theory of what might be wrong.

The ProducerStateManager.removeExpiredProducers() seems to remove producers 
from memory erroneously when processing records which are older than the 
maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
configuration), which is set by default to 7 days. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6799) Consumer livelock during consumer group rebalance

2018-04-23 Thread Attila Sasvari (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Sasvari resolved KAFKA-6799.
---
Resolution: Information Provided

> Consumer livelock during consumer group rebalance
> -
>
> Key: KAFKA-6799
> URL: https://issues.apache.org/jira/browse/KAFKA-6799
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0
>Reporter: Pierre-Henri Dezanneau
>Assignee: Attila Sasvari
>Priority: Critical
>
> We have the following environment:
> * 1 kafka cluster with 3 brokers
> * 1 topic with 3 partitions
> * 1 producer
> * 1 consumer group with 3 consumers
> From this setup, we remove one broker from the cluster, the hard way, by 
> simply killing it. Quite often, we see that the consumer group is not 
> rebalanced correctly. By that I mean that all 3 consumers stop consuming and 
> get stuck in a loop, forever.
> The thread dump shows that the consumer threads aren't blocked but run 
> forever in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due 
> to the {{synchonized}} keyword on the calling method. Heartbeat threads are 
> blocked, waiting for the consumer threads to release the lock. This situation 
> prevents all consumers from consuming any more record.
> We build a simple project which seems to reliably demonstrate this:
> {code:sh}
> $ mkdir -p /tmp/sandbox && cd /tmp/sandbox
> $ git clone https://github.com/phdezann/helloworld-kafka-livelock
> $ cd helloworld-kafka-livelock && ./spin.sh
> ...
> livelock detected
> {code}
> {code:sh|title=Consumer thread|borderStyle=solid}
> "kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728
> at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl)
> - locked <0x2a16> (a java.util.Collections$UnmodifiableSet)
> - locked <0x2a17> (a sun.nio.ch.Util$3)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:684)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
> - locked <0x2a0c> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> at 
> org.helloworld.kafka.bus.HelloWorldKafkaListener.lambda$createConsumerInDedicatedThread$0(HelloWorldKafkaListener.java:45)
> at 
> org.helloworld.kafka.bus.HelloWorldKafkaListener$$Lambda$42.1776656466.run(Unknown
>  Source:-1)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:sh|title=Heartbeat thread|borderStyle=solid}
> "kafka-coordinator-heartbeat-thread | helloWorldGroup@10728" daemon prio=5 
> tid=0x36 nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>waiting for kafka-consumer-1@10733 to release lock on <0x2a0c> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at java.lang.Object.wait(Object.java:-1)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:955)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: RE: [DISCUSS] KIP-280: Enhanced log compaction

2018-04-23 Thread Luís Cabral
 Hi Guozhang,

Thank you very much for the patience in explaining your points, I've learnt 
quite a bit in researching and experimenting after your replies.


bq. I still think it is worth defining `timestamp` as a special compaction value

I don't personally see advantages in it, but also the only disadvantage that I 
can think of is putting multiple meanings on this field, which does not seem 
enough to dissuade anyone, so I've added it to the KIP as a compromise. 
(please also see the pull request in case you want to confirm the 
implementation matches your idea)


bq. Should it be "the record with the highest value will be kept"?


That is describing a scenario where the records being compared have the same 
value, in which case the offset is used as a tie-breaker. 
With trying to cover as much as possible, the "Proposed Changes" may have 
became confusing to read, sorry for that...


bq. Users are then responsible to encode their compaction field according to 
the byte array lexico-ordering to full fill their ordering semantics. It is 
more flexible to enforce users to encode their compaction field always as a 
long type.

This was indeed my focus on the previous replies, since I am not sure how this 
would work without adding a lot of responsibility on the client side. 
So, rather than trying to debate best practices, since I don't know which ones 
are being followed in this project, I will instead debate my own selfish need 
for this feature: 
Having it this way would jeopardize my own particular use case, as I need to 
have an incremental number representing the version (i.e.: 1, 2, 3, 5, 52, et 
cetera). It does not totally invalidate it, since we can always convert it to 
String on the client side and left-pad with 0's to the max length of a long, 
but it seems a shame to have to do this as it would increase the data transfer 
size (I'm trying to avoid it becoming a bottleneck during high throughput 
periods). This would likely mean that I would start abusing the "timestamp" 
approach discussed above, as it keeps the messages nimble, but it would again 
be a shame to be forced into such a hacky solution.
This is how I see it, and why I would like to avoid it. But maybe there is some 
smarter way that you know of on how to handle it on the client side that would 
invalidate these concerns?
Please let me know, and I would also greatly value some more feedback from 
other people regarding this topic, so please don't be shy! 

Kind Regards,LuisOn Friday, April 20, 2018, 7:41:30 PM GMT+2, Guozhang Wang 
 wrote:  
 
 Hi Luís,

What I'm thinking primarily is that we only need to compare the compaction
values as LONG for the offset and timestmap "type" (I still think it is
worth defining `timestamp` as a special compaction value, with the reasons
below).

Not sure if you've seen my other comment earlier regarding the offset /
timestmap, I'm pasting / editing them here to illustrate my idea:

--

I think maybe we have a mis-communication here: I'm not against the idea of
using headers, but just trying to argue that we could make `timestamp`
field a special config value that is referring to the timestamp field in
the metadata. So from log cleaner's pov:

1. if the config value is "offset", look into the offset field, *comparing
their value as long*
2. if the config value is "timestamp", look into the timestamp field,
*comparing
their value as long*
3. otherwise, say the config value is "foo", search for key "foo" in the
message header, comparing the value as *byte arrays*

I.e. "offset" and "timestamp" are treated as special cases other than case
3) above.

--

I think your main concern is that "Although the byte[] can be compared, it
is not actually comparable as the versioning is based on a long", while I'm
thinking we can indeed generalize it: there is not hard reasons that the
"compaction value" has to be a long, and since the goal of this KIP is to
generalize the log compaction logic to consider header fields, why not
allowing it to be of any types than enforcing them still to be a long type?
Users are then responsible to encode their compaction field according to
the byte array lexico-ordering to full fill their ordering semantics. It is
more flexible to enforce users to encode their compaction field always as a
long type. Let me know WDYT.



Also I have some minor comments on the wiki itself:

1) "When both records being compared contain a matching "compaction value",
then the record with the highest offset will be kept;"

Should it be "the record with the highest value will be kept"?




Guozhang


On Fri, Apr 20, 2018 at 1:05 AM, Luís Cabral 
wrote:

>  Guozhang, is this reply ok with you?
>
>
> If you insist on the byte[] comparison directly, then I would need some
> suggestions on how to represent a "version" with it, and then the KIP could
> be changed to that.
>    On Tuesday, April 17, 2018, 2:44:16 PM GMT+2, Luís Cabral <
> luis_cab...@yahoo.com> wrote:
>
>  Oops

[jira] [Created] (KAFKA-6818) Efficient way to get last n rows via admin client

2018-04-23 Thread Werner Daehn (JIRA)
Werner Daehn created KAFKA-6818:
---

 Summary: Efficient way to get last n rows via admin client
 Key: KAFKA-6818
 URL: https://issues.apache.org/jira/browse/KAFKA-6818
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 1.1.0
Reporter: Werner Daehn


Peeking into the last records of a topic is quite common, e.g. for data 
preview. 

Doing that however can be quite difficult. Need to get the current offset for 
each partition, seek to an earlier point so that the overall record count of 
all partitions is n records in sum, handle when there are less than maxcount 
records in the topic etc.

Would be nice if there is a simplified API for just that, maybe in the admin 
client even.

List preview(String topicname, int maxcount);

 

As second API for getting all data since a timestamp would be nice as well. 
Although this time the amount of data could be huge, hence not convinced it 
would not be better a normal seek/attach operation.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: RE: [DISCUSS] KIP-280: Enhanced log compaction

2018-04-23 Thread Guozhang Wang
Hello Luis,

Thanks for your email, replying to your points in the following:

> I don't personally see advantages in it, but also the only disadvantage
that I can think of is putting multiple meanings on this field.

If we do not treat timestamp as a special value of the config, then I
cannot use the timestamp field of the record as the compaction value, since
we will only look into the record header other than the default offset,
right? Then users wanting to use the timestamp as the compaction value have
to put that timestamp into the record header with a name, which duplicates
the field unnecessary. So to me without treating it as a special value we
are doomed to have duplicate record field.

> Having it this way would jeopardize my own particular use case, as I need
to have an incremental number representing the version (i.e.: 1, 2, 3, 5,
52, et cetera)

The issue that I do not understand completely is why you'd keep saying that
why we need to convert it to a String, first then converting to any other
fields. Since the header is organized in:

public interface Header {

String key();

byte[] value();

}


Which means that the header value can be of any types. So with your use
case why can't you just serialize your incremental version number into a
byte array directly, whose lexico-order obeys the version number value?? I
think the default byte serialization mechanism of the integer is sufficient
for this purpose (assuming that increment number is int).



Guozhang




On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral 
wrote:

>  Hi Guozhang,
>
> Thank you very much for the patience in explaining your points, I've
> learnt quite a bit in researching and experimenting after your replies.
>
>
> bq. I still think it is worth defining `timestamp` as a special compaction
> value
>
> I don't personally see advantages in it, but also the only disadvantage
> that I can think of is putting multiple meanings on this field, which does
> not seem enough to dissuade anyone, so I've added it to the KIP as a
> compromise.
> (please also see the pull request in case you want to confirm the
> implementation matches your idea)
>
>
> bq. Should it be "the record with the highest value will be kept"?
>
>
> That is describing a scenario where the records being compared have the
> same value, in which case the offset is used as a tie-breaker.
> With trying to cover as much as possible, the "Proposed Changes" may have
> became confusing to read, sorry for that...
>
>
> bq. Users are then responsible to encode their compaction field according
> to the byte array lexico-ordering to full fill their ordering semantics. It
> is more flexible to enforce users to encode their compaction field always
> as a long type.
>
> This was indeed my focus on the previous replies, since I am not sure how
> this would work without adding a lot of responsibility on the client side.
> So, rather than trying to debate best practices, since I don't know which
> ones are being followed in this project, I will instead debate my own
> selfish need for this feature:
> Having it this way would jeopardize my own particular use case, as I need
> to have an incremental number representing the version (i.e.: 1, 2, 3, 5,
> 52, et cetera). It does not totally invalidate it, since we can always
> convert it to String on the client side and left-pad with 0's to the max
> length of a long, but it seems a shame to have to do this as it would
> increase the data transfer size (I'm trying to avoid it becoming a
> bottleneck during high throughput periods). This would likely mean that I
> would start abusing the "timestamp" approach discussed above, as it keeps
> the messages nimble, but it would again be a shame to be forced into such a
> hacky solution.
> This is how I see it, and why I would like to avoid it. But maybe there is
> some smarter way that you know of on how to handle it on the client side
> that would invalidate these concerns?
> Please let me know, and I would also greatly value some more feedback from
> other people regarding this topic, so please don't be shy!
>
> Kind Regards,LuisOn Friday, April 20, 2018, 7:41:30 PM GMT+2, Guozhang
> Wang  wrote:
>
>  Hi Luís,
>
> What I'm thinking primarily is that we only need to compare the compaction
> values as LONG for the offset and timestmap "type" (I still think it is
> worth defining `timestamp` as a special compaction value, with the reasons
> below).
>
> Not sure if you've seen my other comment earlier regarding the offset /
> timestmap, I'm pasting / editing them here to illustrate my idea:
>
> --
>
> I think maybe we have a mis-communication here: I'm not against the idea of
> using headers, but just trying to argue that we could make `timestamp`
> field a special config value that is referring to the timestamp field in
> the metadata. So from log cleaner's pov:
>
> 1. if the config value is "offset", look into the offset field, *comparing
> their value as long*
> 2. if the confi

Re: [DISCUSS] KIP-278: Add version option to Kafka's commands

2018-04-23 Thread Colin McCabe
Hi Sasaki,

Thanks for the KIP.  I think a version flag is a good idea.

Can you give a little more detail about what would be displayed when the 
version command was used?

We clearly want the version number, but we probably also want to know if this 
is an official release, or a random SNAPSHOT from a branch.  If this is a 
release candidate, we probably want the RC number as well, like "1.1-rc3"  We 
also want a git hash.  This can be injected by the build process.  In the case 
of an official release, where the source code is not under git, we can pull it 
from a file.

For example, hadoop's version output looks like this:

 > cmccabe@aurora:~/Downloads/hadoop-2.8.3> ./bin/hadoop version
 > Hadoop 2.8.3
 > Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
 > b3fe56402d908019d99af1f1f4fc65cb1d1436a2
 > Compiled by jdu on 2017-12-05T03:43Z
 > Compiled with protoc 2.5.0
 > From source with checksum 9ff4856d824e983fa510d3f843e3f19d
 > This command was run using 
 > /home/cmccabe/Downloads/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar

(The "subversion" line here is a little weird -- it now refers to git, not svn)

On Wed, Apr 11, 2018, at 13:58, Jason Gustafson wrote:
> Hey Sasaki,
> 
> Yeah, I don't feel too strongly about only supporting --version. I agree it
> may help discoverability given the current approach. On the other hand, if
> we refactored all of the tools so that we could use a common set of base
> options, it might be a little annoying to have to continue supporting both
> variations. For example, tool standardization was proposed in KIP-14 and
> I'm still holding out hope that someone will have time to pick this work
> back up. It's always easier to add an option than remove one, so I'm
> slightly inclined to have only --version for now. What do you think?

The double dash version is more consistent with how our other flags work.

In general, I feel that if --version is supported, --help should say so.

best,
Colin


> 
> Thanks,
> Jason
> 
> On Tue, Apr 10, 2018 at 12:00 AM, Sasaki Toru 
> wrote:
> 
> > Hi Jason
> >
> > Thank you for helpful comments. I updated wiki based on your advice.
> >
> > I thought this option was relatively common and making maintenance easy
> > was also important.
> > However, as you said, it is not good that version option won't be shown up
> > in help description.
> >
> > I thought accepting both single-dash and double-dash will help to find
> > this option.
> > In my approach this option won't be showed, but most of software which has
> > this option accepts either single-dash or double-dash.
> > I guess it doesn't need to support both if we take another way.
> >
> >
> > Thanks
> >
> > @Ted Yeah, you're right. Sorry about the confusion.
> >>
> >> Since we're here, I think this KIP is a nice improvement. It's definitely
> >> nice to have an easy way to check the version. That said, do we really
> >> need
> >> to support both `-version` and `--version`? The latter is consistent with
> >> our current tools.
> >>
> >> Also, I think the approach we've taken is basically to build the --version
> >> functionality into the bash script. This is nice because it saves a lot of
> >> work to update the commands individually and we don't need to do anything
> >> when we add new tools. The downside is that `--version` won't show up as
> >> an
> >> option in any of the --help output. Not sure if that is too big of a
> >> problem, but maybe worth mentioning this in the rejected alternatives
> >> section.
> >>
> >>
> >> -Jason
> >>
> >> On Wed, Apr 4, 2018 at 9:42 AM, Ted Yu  wrote:
> >>
> >> Jason:
> >>> Maybe your reply was intended for another KIP ?
> >>>
> >>> KIP-278 is about adding version option, not timeout.
> >>>
> >>> Cheers
> >>>
> >>> On Wed, Apr 4, 2018 at 9:36 AM, Jason Gustafson 
> >>> wrote:
> >>>
> >>> Hi Sasaki,
> 
>  Thanks for the KIP. I think the timeout controls the maximum allowed
> 
> >>> time
> >>
> >>> that the consumer will block for the next record. Maybe the meaning
> 
> >>> would
> >>
> >>> be clearer with the more concise name `--timeout`? That also fits with
> 
> >>> the
> >>>
>  old consumer which overrides the `consumer.timeout.ms` property.
> 
>  By the way, it seems like the default value was intentionally set low
> 
> >>> for
> >>
> >>> both the old and new consumers, but I'm not sure of the reason. We could
>  leave the default as it is if we want to be safe, but increasing it
> 
> >>> seems
> >>
> >>> ok to me. Perhaps we could start a little lower, though, say 10 seconds?
> 
> >>> In
> >>>
>  any case, we should make it clear to the user that the timeout was
> 
> >>> reached.
> >>>
>  It's surprising to see only the incomplete reported results following a
>  timeout.
> 
>  Thanks,
>  Jason
> 
>  On Wed, Apr 4, 2018 at 4:37 AM, Sasaki Toru 
>  wrote:
> 
>  Hello everyone,
> >
> > I would like to start a discussi

[jira] [Resolved] (KAFKA-6815) "default.production.exception.handler" default value is not specified correctly in KafkaStream doc

2018-04-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6815.
--
   Resolution: Fixed
Fix Version/s: 1.0.0
   1.1.0

> "default.production.exception.handler" default value is not specified 
> correctly in KafkaStream doc
> --
>
> Key: KAFKA-6815
> URL: https://issues.apache.org/jira/browse/KAFKA-6815
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 1.1.0
>Reporter: taekyung kim
>Priority: Minor
> Fix For: 1.1.0, 1.0.0
>
>
>  
> [https://kafka.apache.org/11/documentation/streams/developer-guide/config-streams.html]
> ver 1.1 document show invalid value
> |default.deserialization.exception.handler|Medium|Exception handling class 
> that implements the {{DeserializationExceptionHandler}}interface.|3 
> milliseconds|
>  
> actually it is  LogAndFailExceptionHandler class



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


RE: RE: [DISCUSS] KIP-280: Enhanced log compaction

2018-04-23 Thread Luís Cabral
Hello Guozhang,

Thanks for the fast reply!

As for the matter of the timestamp, it’s now added to the KIP, so I hope this 
is correctly addressed.
Kindly let me know if you would like some adaptions to the concept.


bq. The issue that I do not understand completely is why you'd keep saying that 
why we need to convert it to a String, first then converting to any other 
fields.

Maybe I’m over-engineering it again, and the problem can be simplified to 
restricting this to values greater than or equal to zero, which ends up being 
ok for my own use case...
This would then generally guarantee the lexicographic ordering, as you say.
Is this what you mean? Should I then add this restriction to the KIP?

Cheers,
Luis

From: Guozhang Wang
Sent: 23 April 2018 17:55
To: dev@kafka.apache.org
Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction

Hello Luis,

Thanks for your email, replying to your points in the following:

> I don't personally see advantages in it, but also the only disadvantage
that I can think of is putting multiple meanings on this field.

If we do not treat timestamp as a special value of the config, then I
cannot use the timestamp field of the record as the compaction value, since
we will only look into the record header other than the default offset,
right? Then users wanting to use the timestamp as the compaction value have
to put that timestamp into the record header with a name, which duplicates
the field unnecessary. So to me without treating it as a special value we
are doomed to have duplicate record field.

> Having it this way would jeopardize my own particular use case, as I need
to have an incremental number representing the version (i.e.: 1, 2, 3, 5,
52, et cetera)

The issue that I do not understand completely is why you'd keep saying that
why we need to convert it to a String, first then converting to any other
fields. Since the header is organized in:

public interface Header {

String key();

byte[] value();

}


Which means that the header value can be of any types. So with your use
case why can't you just serialize your incremental version number into a
byte array directly, whose lexico-order obeys the version number value?? I
think the default byte serialization mechanism of the integer is sufficient
for this purpose (assuming that increment number is int).



Guozhang




On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral 
wrote:

>  Hi Guozhang,
>
> Thank you very much for the patience in explaining your points, I've
> learnt quite a bit in researching and experimenting after your replies.
>
>
> bq. I still think it is worth defining `timestamp` as a special compaction
> value
>
> I don't personally see advantages in it, but also the only disadvantage
> that I can think of is putting multiple meanings on this field, which does
> not seem enough to dissuade anyone, so I've added it to the KIP as a
> compromise.
> (please also see the pull request in case you want to confirm the
> implementation matches your idea)
>
>
> bq. Should it be "the record with the highest value will be kept"?
>
>
> That is describing a scenario where the records being compared have the
> same value, in which case the offset is used as a tie-breaker.
> With trying to cover as much as possible, the "Proposed Changes" may have
> became confusing to read, sorry for that...
>
>
> bq. Users are then responsible to encode their compaction field according
> to the byte array lexico-ordering to full fill their ordering semantics. It
> is more flexible to enforce users to encode their compaction field always
> as a long type.
>
> This was indeed my focus on the previous replies, since I am not sure how
> this would work without adding a lot of responsibility on the client side.
> So, rather than trying to debate best practices, since I don't know which
> ones are being followed in this project, I will instead debate my own
> selfish need for this feature:
> Having it this way would jeopardize my own particular use case, as I need
> to have an incremental number representing the version (i.e.: 1, 2, 3, 5,
> 52, et cetera). It does not totally invalidate it, since we can always
> convert it to String on the client side and left-pad with 0's to the max
> length of a long, but it seems a shame to have to do this as it would
> increase the data transfer size (I'm trying to avoid it becoming a
> bottleneck during high throughput periods). This would likely mean that I
> would start abusing the "timestamp" approach discussed above, as it keeps
> the messages nimble, but it would again be a shame to be forced into such a
> hacky solution.
> This is how I see it, and why I would like to avoid it. But maybe there is
> some smarter way that you know of on how to handle it on the client side
> that would invalidate these concerns?
> Please let me know, and I would also greatly value some more feedback from
> other people regarding this topic, so please don't be shy!
>
> Kind Regards,LuisOn Friday, April 20,

Re: RE: [DISCUSS] KIP-280: Enhanced log compaction

2018-04-23 Thread Guozhang Wang
Hi Luis,

I think by "generalizing it" we could go beyond numerical values, and
that's why I suggested we do not need to require that the type serialized
to the bytes have any numerical semantics since it has to ben serialized to
a byte array anyways. I understand that for your use case, the intended
record header compaction value is a number, but imagine if someone else
wants to compact the same-keyed messages based on some record header
key-value pair whose value types before serializing to bytes are not
numbers at all, but just some strings:

key: "A", value: "a1", header: ["bar" -> "a".bytes()],
key: "A", value: "a2", header: ["bar" -> "c".bytes()],
key: "A", value: "a3", header: ["bar" -> "b".bytes()],


Could we allow them to use that header for compaction as well?


Now going back to your use case, for numbers that could be negative values,
as long as users are aware of the requirement and change the default
encoding schemes when they generate the producer record while setting the
headers so that the serialized bytes still obey the value that should be OK
(again, as I said, we push this responsibility to users to define the right
serde mechanism, but that seems to be more flexible). For example: -INF
serialized to 0x, -INF+1 serialized to 0x0001, etc.



Guozhang





On Mon, Apr 23, 2018 at 10:19 AM, Luís Cabral  wrote:

> Hello Guozhang,
>
> Thanks for the fast reply!
>
> As for the matter of the timestamp, it’s now added to the KIP, so I hope
> this is correctly addressed.
> Kindly let me know if you would like some adaptions to the concept.
>
>
> bq. The issue that I do not understand completely is why you'd keep saying
> that why we need to convert it to a String, first then converting to any
> other fields.
>
> Maybe I’m over-engineering it again, and the problem can be simplified to
> restricting this to values greater than or equal to zero, which ends up
> being ok for my own use case...
> This would then generally guarantee the lexicographic ordering, as you say.
> Is this what you mean? Should I then add this restriction to the KIP?
>
> Cheers,
> Luis
>
> From: Guozhang Wang
> Sent: 23 April 2018 17:55
> To: dev@kafka.apache.org
> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
> Hello Luis,
>
> Thanks for your email, replying to your points in the following:
>
> > I don't personally see advantages in it, but also the only disadvantage
> that I can think of is putting multiple meanings on this field.
>
> If we do not treat timestamp as a special value of the config, then I
> cannot use the timestamp field of the record as the compaction value, since
> we will only look into the record header other than the default offset,
> right? Then users wanting to use the timestamp as the compaction value have
> to put that timestamp into the record header with a name, which duplicates
> the field unnecessary. So to me without treating it as a special value we
> are doomed to have duplicate record field.
>
> > Having it this way would jeopardize my own particular use case, as I need
> to have an incremental number representing the version (i.e.: 1, 2, 3, 5,
> 52, et cetera)
>
> The issue that I do not understand completely is why you'd keep saying that
> why we need to convert it to a String, first then converting to any other
> fields. Since the header is organized in:
>
> public interface Header {
>
> String key();
>
> byte[] value();
>
> }
>
>
> Which means that the header value can be of any types. So with your use
> case why can't you just serialize your incremental version number into a
> byte array directly, whose lexico-order obeys the version number value?? I
> think the default byte serialization mechanism of the integer is sufficient
> for this purpose (assuming that increment number is int).
>
>
>
> Guozhang
>
>
>
>
> On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral  >
> wrote:
>
> >  Hi Guozhang,
> >
> > Thank you very much for the patience in explaining your points, I've
> > learnt quite a bit in researching and experimenting after your replies.
> >
> >
> > bq. I still think it is worth defining `timestamp` as a special
> compaction
> > value
> >
> > I don't personally see advantages in it, but also the only disadvantage
> > that I can think of is putting multiple meanings on this field, which
> does
> > not seem enough to dissuade anyone, so I've added it to the KIP as a
> > compromise.
> > (please also see the pull request in case you want to confirm the
> > implementation matches your idea)
> >
> >
> > bq. Should it be "the record with the highest value will be kept"?
> >
> >
> > That is describing a scenario where the records being compared have the
> > same value, in which case the offset is used as a tie-breaker.
> > With trying to cover as much as possible, the "Proposed Changes" may have
> > became confusing to read, sorry for that...
> >
> >
> > bq. Users are then responsible to encode their compaction field according
> > to the byte array lexico-or

[jira] [Resolved] (KAFKA-6376) Improve Streams metrics for skipped records

2018-04-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6376.
--
   Resolution: Fixed
Fix Version/s: 1.2.0

Issue resolved by pull request 4812
[https://github.com/apache/kafka/pull/4812]

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.
> KIP: : 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk10 #41

2018-04-23 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Fixed deserialization.exception.handler default value of

--
[...truncated 1.48 MB...]
kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToEarliest STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToEarliest PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsExportImportPlan 
STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsExportImportPlan 
PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToSpecificOffset 
STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToSpecificOffset 
PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsShiftPlus STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsShiftPlus PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLatest STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLatest PASSED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsNewConsumerExistingTopic STARTED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsNewConsumerExistingTopic PASSED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsShiftByLowerThanEarliest STARTED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsShiftByLowerThanEarliest PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsByDuration STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsByDuration PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLocalDateTime 
STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLocalDateTime 
PASSED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsToEarliestOnTopicsAndPartitions STARTED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsToEarliestOnTopicsAndPartitions PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToEarliestOnTopics 
STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToEarliestOnTopics 
PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfBlankArg STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfBlankArg PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowVerifyWithoutReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowVerifyWithoutReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowTopicsOptionWithVerify STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowTopicsOptionWithVerify PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithThrottleOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithThrottleOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfNoArgs STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfNoArgs PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithoutReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithoutReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowBrokersListWithVerifyOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowBrokersListWithVerifyOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumExecuteOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumExecuteOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumGenerateOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumGenerateOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersAndTopicsOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersAndTopicsOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowThrottleWithVerifyOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowThrottleWithVerifyOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldUseDefaultsIfEnabled 
STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldUseDefaultsIfEnabled 
PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldAllowThrottleOptionOnExecute STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldAllowThr

Re: [DISCUSS] KIP-278: Add version option to Kafka's commands

2018-04-23 Thread Ismael Juma
FYI, the injection via the build process that is mentioned here already
happens. See AppInfoParser.

Ismael

On Mon, Apr 23, 2018 at 9:39 AM, Colin McCabe  wrote:

> Hi Sasaki,
>
> Thanks for the KIP.  I think a version flag is a good idea.
>
> Can you give a little more detail about what would be displayed when the
> version command was used?
>
> We clearly want the version number, but we probably also want to know if
> this is an official release, or a random SNAPSHOT from a branch.  If this
> is a release candidate, we probably want the RC number as well, like
> "1.1-rc3"  We also want a git hash.  This can be injected by the build
> process.  In the case of an official release, where the source code is not
> under git, we can pull it from a file.
>
> For example, hadoop's version output looks like this:
>
>  > cmccabe@aurora:~/Downloads/hadoop-2.8.3> ./bin/hadoop version
>  > Hadoop 2.8.3
>  > Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
> b3fe56402d908019d99af1f1f4fc65cb1d1436a2
>  > Compiled by jdu on 2017-12-05T03:43Z
>  > Compiled with protoc 2.5.0
>  > From source with checksum 9ff4856d824e983fa510d3f843e3f19d
>  > This command was run using /home/cmccabe/Downloads/
> hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar
>
> (The "subversion" line here is a little weird -- it now refers to git, not
> svn)
>
> On Wed, Apr 11, 2018, at 13:58, Jason Gustafson wrote:
> > Hey Sasaki,
> >
> > Yeah, I don't feel too strongly about only supporting --version. I agree
> it
> > may help discoverability given the current approach. On the other hand,
> if
> > we refactored all of the tools so that we could use a common set of base
> > options, it might be a little annoying to have to continue supporting
> both
> > variations. For example, tool standardization was proposed in KIP-14 and
> > I'm still holding out hope that someone will have time to pick this work
> > back up. It's always easier to add an option than remove one, so I'm
> > slightly inclined to have only --version for now. What do you think?
>
> The double dash version is more consistent with how our other flags work.
>
> In general, I feel that if --version is supported, --help should say so.
>
> best,
> Colin
>
>
> >
> > Thanks,
> > Jason
> >
> > On Tue, Apr 10, 2018 at 12:00 AM, Sasaki Toru  >
> > wrote:
> >
> > > Hi Jason
> > >
> > > Thank you for helpful comments. I updated wiki based on your advice.
> > >
> > > I thought this option was relatively common and making maintenance easy
> > > was also important.
> > > However, as you said, it is not good that version option won't be
> shown up
> > > in help description.
> > >
> > > I thought accepting both single-dash and double-dash will help to find
> > > this option.
> > > In my approach this option won't be showed, but most of software which
> has
> > > this option accepts either single-dash or double-dash.
> > > I guess it doesn't need to support both if we take another way.
> > >
> > >
> > > Thanks
> > >
> > > @Ted Yeah, you're right. Sorry about the confusion.
> > >>
> > >> Since we're here, I think this KIP is a nice improvement. It's
> definitely
> > >> nice to have an easy way to check the version. That said, do we really
> > >> need
> > >> to support both `-version` and `--version`? The latter is consistent
> with
> > >> our current tools.
> > >>
> > >> Also, I think the approach we've taken is basically to build the
> --version
> > >> functionality into the bash script. This is nice because it saves a
> lot of
> > >> work to update the commands individually and we don't need to do
> anything
> > >> when we add new tools. The downside is that `--version` won't show up
> as
> > >> an
> > >> option in any of the --help output. Not sure if that is too big of a
> > >> problem, but maybe worth mentioning this in the rejected alternatives
> > >> section.
> > >>
> > >>
> > >> -Jason
> > >>
> > >> On Wed, Apr 4, 2018 at 9:42 AM, Ted Yu  wrote:
> > >>
> > >> Jason:
> > >>> Maybe your reply was intended for another KIP ?
> > >>>
> > >>> KIP-278 is about adding version option, not timeout.
> > >>>
> > >>> Cheers
> > >>>
> > >>> On Wed, Apr 4, 2018 at 9:36 AM, Jason Gustafson 
> > >>> wrote:
> > >>>
> > >>> Hi Sasaki,
> > 
> >  Thanks for the KIP. I think the timeout controls the maximum allowed
> > 
> > >>> time
> > >>
> > >>> that the consumer will block for the next record. Maybe the meaning
> > 
> > >>> would
> > >>
> > >>> be clearer with the more concise name `--timeout`? That also fits
> with
> > 
> > >>> the
> > >>>
> >  old consumer which overrides the `consumer.timeout.ms` property.
> > 
> >  By the way, it seems like the default value was intentionally set
> low
> > 
> > >>> for
> > >>
> > >>> both the old and new consumers, but I'm not sure of the reason. We
> could
> >  leave the default as it is if we want to be safe, but increasing it
> > 
> > >>> seems
> > >>
> > >>> ok to me. Perhaps we could start a little lower, 

Jenkins build is back to normal : kafka-trunk-jdk7 #3364

2018-04-23 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #2574

2018-04-23 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Fixed deserialization.exception.handler default value of

--
[...truncated 419.51 KB...]

kafka.message.ByteBufferMessageSetTest > testWriteTo STARTED

kafka.message.ByteBufferMessageSetTest > testWriteTo PASSED

kafka.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.message.ByteBufferMessageSetTest > testIterator STARTED

kafka.message.ByteBufferMessageSetTest > testIterator PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testSessionExpireListenerMetrics STARTED

kafka.metrics.MetricsTest > testSessionExpireListenerMetrics PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testClusterIdMetric STARTED

kafka.metrics.MetricsTest > testClusterIdMetric PASSED

kafka.metrics.MetricsTest > testControllerMetrics STARTED

kafka.metrics.MetricsTest > testControllerMetrics PASSED

kafka.metrics.MetricsTest > testWindowsStyleTagNames STARTED

kafka.metrics.MetricsTest > testWindowsStyleTagNames PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut STARTED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.security.token.delegation.DelegationTokenManagerTest > 
testPeriodicTokenExpiry STARTED

kafka.security.token.delegation.DelegationTokenManagerTest > 
testPeriodicTokenExpiry PASSED

kafka.security.token.delegation.DelegationTokenManagerTest > 
testTokenRequestsWithDelegationTokenDisabled STARTED

kafka.security.token.delegation.DelegationTokenManagerTest > 
testTokenRequestsWithDelegationTokenDisabled PASSED

kafka.security.token.delegation.DelegationTokenManagerTest > testDescribeToken 
STARTED

kafka.security.token.delegation.DelegationTokenManagerTest > testDescribeToken 
PASSED

kafka.security.token.delegation.DelegationTokenManagerTest > testCreateToken 
STARTED

kafka.security.token.delegation.DelegationTokenManagerTest > testCreateToken 
PASSED

kafka.security.token.delegation.DelegationTokenManagerTest > testExpireToken 
STARTED

kafka.security.token.delegation.DelegationTokenManagerTest > testExpireToken 
PASSED

kafka.security.token.delegation.DelegationTokenManagerTest > testRenewToken 
STARTED

kafka.security.token.delegation.DelegationTokenManagerTest > testRenewToken 
PASSED

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.PermissionTypeTest > testJavaConversions STARTED

kafka.security.auth.PermissionTypeTest > testJavaConversions PASSED

kafka.security.auth.PermissionTypeTest > testFromString STARTED

kafka.security.auth.PermissionTypeTest > testFromString PASSED

ka

Build failed in Jenkins: kafka-trunk-jdk10 #42

2018-04-23 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6376; refactor skip metrics in Kafka Streams

[jason] MINOR: Fix formatting in --new-consumer deprecation warning (#4903)

--
[...truncated 1.48 MB...]
kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldGenerateNewProducerIdIfEpochsExhausted STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldGenerateNewProducerIdIfEpochsExhausted PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionOnAddPartitionsWhenStateIsPrepareAbort 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionOnAddPartitionsWhenStateIsPrepareAbort 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldInitPidWithEpochZeroForNewTransactionalId STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldInitPidWithEpochZeroForNewTransactionalId PASSED

kafka.coordinator.transaction.ProducerIdManagerTest > testExceedProducerIdLimit 
STARTED

kafka.coordinator.transaction.ProducerIdManagerTest > testExceedProducerIdLimit 
PASSED

kafka.coordinator.transaction.ProducerIdManagerTest > testGetProducerId STARTED

kafka.coordinator.transaction.ProducerIdManagerTest > testGetProducerId PASSED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldSaveForLaterWhenLeaderUnknownButNotAvailable STARTED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldSaveForLaterWhenLeaderUnknownButNotAvailable PASSED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldGenerateEmptyMapWhenNoRequestsOutstanding STARTED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldGenerateEmptyMapWhenNoRequestsOutstanding PASSED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldCreateMetricsOnStarting STARTED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldCreateMetricsOnStarting PASSED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError STARTED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError PASSED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError STARTED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError PASSED

kafka.coordinator.transaction.TransactionMarkerChannelManagerTest > 
shouldComplete

[jira] [Resolved] (KAFKA-6670) Implement a Scala wrapper library for Kafka Streams

2018-04-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6670.
--
   Resolution: Fixed
Fix Version/s: 1.2.0

Issue resolved by pull request 4756
[https://github.com/apache/kafka/pull/4756]

> Implement a Scala wrapper library for Kafka Streams
> ---
>
> Key: KAFKA-6670
> URL: https://issues.apache.org/jira/browse/KAFKA-6670
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Debasish Ghosh
>Assignee: Debasish Ghosh
>Priority: Major
>  Labels: api, kip
> Fix For: 1.2.0
>
>
> Implement a Scala wrapper library for Kafka Streams.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk7 #3365

2018-04-23 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6376; refactor skip metrics in Kafka Streams

[jason] MINOR: Fix formatting in --new-consumer deprecation warning (#4903)

--
[...truncated 412.74 KB...]
at hudson.scm.SCM.poll(SCM.java:408)
at hudson.model.AbstractProject._poll(AbstractProject.java:1384)
at hudson.model.AbstractProject.poll(AbstractProject.java:1287)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:594)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:640)
at 
hudson.util.SequentialExecutionQueue$QueueEntry.run(SequentialExecutionQueue.java:119)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsShiftByLowerThanEarliest PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsByDuration STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsByDuration PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLocalDateTime 
STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLocalDateTime 
PASSED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsToEarliestOnTopicsAndPartitions STARTED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsToEarliestOnTopicsAndPartitions PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToEarliestOnTopics 
STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToEarliestOnTopics 
PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfBlankArg STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfBlankArg PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowVerifyWithoutReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowVerifyWithoutReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowTopicsOptionWithVerify STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowTopicsOptionWithVerify PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithThrottleOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithThrottleOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfNoArgs STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfNoArgs PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithoutReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithoutReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowBrokersListWithVerifyOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowBrokersListWithVerifyOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumExecuteOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumExecuteOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumGenerateOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumGenerateOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersAndTopicsOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersAndTopicsOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowThrottleWithVerifyOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowThrottleWithVerifyOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldUseDefaultsIfEnabled 
STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldUseDefaultsIfEnabled 
PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldAllowThrottleOptionOnExecute STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldAllowThrottleOptionOnExecute PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithBrokers STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithBroke

RE: RE: [DISCUSS] KIP-280: Enhanced log compaction

2018-04-23 Thread Luís Cabral
Hello Guozhang,

The KIP is now updated to reflect this choice in strategy.
Please let me know your thoughts there.

Kind Regards,
Luís

From: Guozhang Wang
Sent: 23 April 2018 19:32
To: dev@kafka.apache.org
Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction

Hi Luis,

I think by "generalizing it" we could go beyond numerical values, and
that's why I suggested we do not need to require that the type serialized
to the bytes have any numerical semantics since it has to ben serialized to
a byte array anyways. I understand that for your use case, the intended
record header compaction value is a number, but imagine if someone else
wants to compact the same-keyed messages based on some record header
key-value pair whose value types before serializing to bytes are not
numbers at all, but just some strings:

key: "A", value: "a1", header: ["bar" -> "a".bytes()],
key: "A", value: "a2", header: ["bar" -> "c".bytes()],
key: "A", value: "a3", header: ["bar" -> "b".bytes()],


Could we allow them to use that header for compaction as well?


Now going back to your use case, for numbers that could be negative values,
as long as users are aware of the requirement and change the default
encoding schemes when they generate the producer record while setting the
headers so that the serialized bytes still obey the value that should be OK
(again, as I said, we push this responsibility to users to define the right
serde mechanism, but that seems to be more flexible). For example: -INF
serialized to 0x, -INF+1 serialized to 0x0001, etc.



Guozhang





On Mon, Apr 23, 2018 at 10:19 AM, Luís Cabral  wrote:

> Hello Guozhang,
>
> Thanks for the fast reply!
>
> As for the matter of the timestamp, it’s now added to the KIP, so I hope
> this is correctly addressed.
> Kindly let me know if you would like some adaptions to the concept.
>
>
> bq. The issue that I do not understand completely is why you'd keep saying
> that why we need to convert it to a String, first then converting to any
> other fields.
>
> Maybe I’m over-engineering it again, and the problem can be simplified to
> restricting this to values greater than or equal to zero, which ends up
> being ok for my own use case...
> This would then generally guarantee the lexicographic ordering, as you say.
> Is this what you mean? Should I then add this restriction to the KIP?
>
> Cheers,
> Luis
>
> From: Guozhang Wang
> Sent: 23 April 2018 17:55
> To: dev@kafka.apache.org
> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
> Hello Luis,
>
> Thanks for your email, replying to your points in the following:
>
> > I don't personally see advantages in it, but also the only disadvantage
> that I can think of is putting multiple meanings on this field.
>
> If we do not treat timestamp as a special value of the config, then I
> cannot use the timestamp field of the record as the compaction value, since
> we will only look into the record header other than the default offset,
> right? Then users wanting to use the timestamp as the compaction value have
> to put that timestamp into the record header with a name, which duplicates
> the field unnecessary. So to me without treating it as a special value we
> are doomed to have duplicate record field.
>
> > Having it this way would jeopardize my own particular use case, as I need
> to have an incremental number representing the version (i.e.: 1, 2, 3, 5,
> 52, et cetera)
>
> The issue that I do not understand completely is why you'd keep saying that
> why we need to convert it to a String, first then converting to any other
> fields. Since the header is organized in:
>
> public interface Header {
>
> String key();
>
> byte[] value();
>
> }
>
>
> Which means that the header value can be of any types. So with your use
> case why can't you just serialize your incremental version number into a
> byte array directly, whose lexico-order obeys the version number value?? I
> think the default byte serialization mechanism of the integer is sufficient
> for this purpose (assuming that increment number is int).
>
>
>
> Guozhang
>
>
>
>
> On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral  >
> wrote:
>
> >  Hi Guozhang,
> >
> > Thank you very much for the patience in explaining your points, I've
> > learnt quite a bit in researching and experimenting after your replies.
> >
> >
> > bq. I still think it is worth defining `timestamp` as a special
> compaction
> > value
> >
> > I don't personally see advantages in it, but also the only disadvantage
> > that I can think of is putting multiple meanings on this field, which
> does
> > not seem enough to dissuade anyone, so I've added it to the KIP as a
> > compromise.
> > (please also see the pull request in case you want to confirm the
> > implementation matches your idea)
> >
> >
> > bq. Should it be "the record with the highest value will be kept"?
> >
> >
> > That is describing a scenario where the records being compared have the
> > same value, in which case the offset 

Build failed in Jenkins: kafka-trunk-jdk8 #2575

2018-04-23 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6376; refactor skip metrics in Kafka Streams

[jason] MINOR: Fix formatting in --new-consumer deprecation warning (#4903)

--
[...truncated 418.68 KB...]

kafka.zk.KafkaZkClientTest > testDeleteRecursive STARTED

kafka.zk.KafkaZkClientTest > testDeleteRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetTopicPartitionStates STARTED

kafka.zk.KafkaZkClientTest > testGetTopicPartitionStates PASSED

kafka.zk.KafkaZkClientTest > testCreateConfigChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateConfigChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testDelegationTokenMethods STARTED

kafka.zk.KafkaZkClientTest > testDelegationTokenMethods PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testSessionExpireListenerMetrics STARTED

kafka.metrics.MetricsTest > testSessionExpireListenerMetrics PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testClusterIdMetric STARTED

kafka.metrics.MetricsTest > testClusterIdMetric PASSED

kafka.metrics.MetricsTest > testControllerMetrics STARTED

kafka.metrics.MetricsTest > testControllerMetrics PASSED

kafka.metrics.MetricsTest > testWindowsStyleTagNames STARTED

kafka.metrics.MetricsTest > testWindowsStyleTagNames PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut STARTED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils STARTED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testZkMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testChroot STARTED

kafka.security.auth.ZkAuthorizationTest > testChroot PASSED

kafka.security.auth.ZkAuthorizationTest > testDelete STARTED

kafka.security.auth.ZkAuthorizationTest > testDelete PASSED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive STARTED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive PASSED

kafka.security.auth.AclTest > testAclJsonConversion STARTED

kafka.security.auth.AclTest > testAclJsonConversion PASSED

kafka.security.auth.PermissionTypeTest > testJavaConversions STARTED

kafka.security.auth.PermissionTypeTest > testJavaConversions PASSED

kafka.security.auth.PermissionTypeTest > testFromString STARTED

kafka.security.auth.PermissionTypeTest >

Re: RE: [DISCUSS] KIP-280: Enhanced log compaction

2018-04-23 Thread Guozhang Wang
Thanks Luís. The KIP looks good to me. Just that what I left as a minor:

`When both records being compared contain a matching "compaction value",
then the record with the highest offset will be kept;`

I understand your intent, it's just that the sentence itself is a bit
misleading, I think what you actually meant to say:

`When both records being compared contain a matching "compaction value" and
their corresponding byte arrays are considered equal, then the record with
the highest offset will be kept;`



Guozhang



On Mon, Apr 23, 2018 at 1:54 PM, Luís Cabral 
wrote:

> Hello Guozhang,
>
> The KIP is now updated to reflect this choice in strategy.
> Please let me know your thoughts there.
>
> Kind Regards,
> Luís
>
> From: Guozhang Wang
> Sent: 23 April 2018 19:32
> To: dev@kafka.apache.org
> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
> Hi Luis,
>
> I think by "generalizing it" we could go beyond numerical values, and
> that's why I suggested we do not need to require that the type serialized
> to the bytes have any numerical semantics since it has to ben serialized to
> a byte array anyways. I understand that for your use case, the intended
> record header compaction value is a number, but imagine if someone else
> wants to compact the same-keyed messages based on some record header
> key-value pair whose value types before serializing to bytes are not
> numbers at all, but just some strings:
>
> key: "A", value: "a1", header: ["bar" -> "a".bytes()],
> key: "A", value: "a2", header: ["bar" -> "c".bytes()],
> key: "A", value: "a3", header: ["bar" -> "b".bytes()],
>
>
> Could we allow them to use that header for compaction as well?
>
>
> Now going back to your use case, for numbers that could be negative values,
> as long as users are aware of the requirement and change the default
> encoding schemes when they generate the producer record while setting the
> headers so that the serialized bytes still obey the value that should be OK
> (again, as I said, we push this responsibility to users to define the right
> serde mechanism, but that seems to be more flexible). For example: -INF
> serialized to 0x, -INF+1 serialized to 0x0001, etc.
>
>
>
> Guozhang
>
>
>
>
>
> On Mon, Apr 23, 2018 at 10:19 AM, Luís Cabral
>  > wrote:
>
> > Hello Guozhang,
> >
> > Thanks for the fast reply!
> >
> > As for the matter of the timestamp, it’s now added to the KIP, so I hope
> > this is correctly addressed.
> > Kindly let me know if you would like some adaptions to the concept.
> >
> >
> > bq. The issue that I do not understand completely is why you'd keep
> saying
> > that why we need to convert it to a String, first then converting to any
> > other fields.
> >
> > Maybe I’m over-engineering it again, and the problem can be simplified to
> > restricting this to values greater than or equal to zero, which ends up
> > being ok for my own use case...
> > This would then generally guarantee the lexicographic ordering, as you
> say.
> > Is this what you mean? Should I then add this restriction to the KIP?
> >
> > Cheers,
> > Luis
> >
> > From: Guozhang Wang
> > Sent: 23 April 2018 17:55
> > To: dev@kafka.apache.org
> > Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > Hello Luis,
> >
> > Thanks for your email, replying to your points in the following:
> >
> > > I don't personally see advantages in it, but also the only disadvantage
> > that I can think of is putting multiple meanings on this field.
> >
> > If we do not treat timestamp as a special value of the config, then I
> > cannot use the timestamp field of the record as the compaction value,
> since
> > we will only look into the record header other than the default offset,
> > right? Then users wanting to use the timestamp as the compaction value
> have
> > to put that timestamp into the record header with a name, which
> duplicates
> > the field unnecessary. So to me without treating it as a special value we
> > are doomed to have duplicate record field.
> >
> > > Having it this way would jeopardize my own particular use case, as I
> need
> > to have an incremental number representing the version (i.e.: 1, 2, 3, 5,
> > 52, et cetera)
> >
> > The issue that I do not understand completely is why you'd keep saying
> that
> > why we need to convert it to a String, first then converting to any other
> > fields. Since the header is organized in:
> >
> > public interface Header {
> >
> > String key();
> >
> > byte[] value();
> >
> > }
> >
> >
> > Which means that the header value can be of any types. So with your use
> > case why can't you just serialize your incremental version number into a
> > byte array directly, whose lexico-order obeys the version number value??
> I
> > think the default byte serialization mechanism of the integer is
> sufficient
> > for this purpose (assuming that increment number is int).
> >
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral
>  > >
> > wrote:

[jira] [Created] (KAFKA-6819) Refactor build-in StreamsMetrics internal implementations

2018-04-23 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6819:


 Summary: Refactor build-in StreamsMetrics internal implementations
 Key: KAFKA-6819
 URL: https://issues.apache.org/jira/browse/KAFKA-6819
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Our current internal implementations of StreamsMetrics and different layered 
metrics like StreamMetricsThreadImpl, TaskMetrics, NodeMetrics etc are a bit 
messy nowadays. We could improve on the current situation by doing the 
following:

0. For thread-level metrics, refactor the {{StreamsMetricsThreadImpl}} class to 
{{ThreadMetrics}} such that a) it does not extend from {{StreamsMetricsImpl}} 
but just include the {{StreamsMetricsThreadImpl}} as its constructor 
parameters. And make its constructor, replacing with a static 
{{addAllSensors(threadName)}} that tries to register all the thread-level 
sensors for the given thread name.

1. Add a static function for each of the built-in sensors of the thread-level 
metrics in {{ThreadMetrics}} that relies on the internal 
{{StreamsMetricsConventions}} to get thread level sensor names. If the sensor 
cannot be found from the internal {{Metrics}} registry, create the sensor 
on-the-fly.

2.a Add a static {{removeAllSensors(threadName)}} function in {{ThreadMetrics}} 
that tries to de-register all the thread-level metrics for this thread, if 
there is no sensors then it will be a no-op. In {{StreamThread#close()}} we 
will trigger this function; and similarly in `TopologyTestDriver` when we close 
the driver we will also call this function as well. As a result, the 
{{ThreadMetrics}} class itself would only contain static functions with no 
member fields at all.

2.b We can consider doing the same for {{TaskMetrics}}, {{NodeMetrics}} and 
{{NamedCacheMetrics}} as well, and add a {{StoreMetrics}} following the similar 
pattern: although these metrics are not accessed externally to their enclosing 
class in the future this may be changed as well.

3. Then, we only pass {{StreamsMetricsImpl}} around between the internal 
classes, to access the specific sensor whenever trying to record it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6820) Improve on StreamsMetrics Public APIs

2018-04-23 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6820:


 Summary: Improve on StreamsMetrics Public APIs
 Key: KAFKA-6820
 URL: https://issues.apache.org/jira/browse/KAFKA-6820
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Our current `addLatencyAndThroughputSensor`, `addThroughputSensor` are not very 
well designed and hence not very user friendly to people to add their 
customized sensors. We could consider improving on this feature. Some related 
things to consider:

1. Our internal built-in metrics should be independent on these public APIs 
which are for user customized sensor only. See KAFKA-6819 for related 
description.

2. We could enforce the scopeName possible values, and well document on the 
sensor hierarchies that would be incurred from the function calls. In this way 
the library can help closing user's sensors automatically when the 
corresponding scope (store, task, thread, etc) is being de-constructed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #2576

2018-04-23 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6670: Implement a Scala wrapper library for Kafka Streams

--
[...truncated 416.18 KB...]

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsShiftPlus STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsShiftPlus PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLatest STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLatest PASSED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsNewConsumerExistingTopic STARTED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsNewConsumerExistingTopic PASSED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsShiftByLowerThanEarliest STARTED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsShiftByLowerThanEarliest PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsByDuration STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsByDuration PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLocalDateTime 
STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToLocalDateTime 
PASSED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsToEarliestOnTopicsAndPartitions STARTED

kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsToEarliestOnTopicsAndPartitions PASSED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToEarliestOnTopics 
STARTED

kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToEarliestOnTopics 
PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfBlankArg STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfBlankArg PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowVerifyWithoutReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowVerifyWithoutReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowTopicsOptionWithVerify STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowTopicsOptionWithVerify PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithThrottleOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithThrottleOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfNoArgs STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfNoArgs PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithoutReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithoutReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowBrokersListWithVerifyOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowBrokersListWithVerifyOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumExecuteOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumExecuteOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumGenerateOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumGenerateOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersAndTopicsOptions STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersAndTopicsOptions PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowThrottleWithVerifyOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowThrottleWithVerifyOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldUseDefaultsIfEnabled 
STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldUseDefaultsIfEnabled 
PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldAllowThrottleOptionOnExecute STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldAllowThrottleOptionOnExecute PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithBrokers STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithBrokers PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithTopicsOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowExecuteWithTopicsOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldCorrectlyParseValidMinimumVerifyOptions

Jenkins build is back to normal : kafka-trunk-jdk10 #43

2018-04-23 Thread Apache Jenkins Server
See 




RE: RE: [DISCUSS] KIP-280: Enhanced log compaction

2018-04-23 Thread Luís Cabral
That is definitely clearer, KIP updated!

From: Guozhang Wang
Sent: 23 April 2018 23:44
To: dev@kafka.apache.org
Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction

Thanks Luís. The KIP looks good to me. Just that what I left as a minor:

`When both records being compared contain a matching "compaction value",
then the record with the highest offset will be kept;`

I understand your intent, it's just that the sentence itself is a bit
misleading, I think what you actually meant to say:

`When both records being compared contain a matching "compaction value" and
their corresponding byte arrays are considered equal, then the record with
the highest offset will be kept;`



Guozhang



On Mon, Apr 23, 2018 at 1:54 PM, Luís Cabral 
wrote:

> Hello Guozhang,
>
> The KIP is now updated to reflect this choice in strategy.
> Please let me know your thoughts there.
>
> Kind Regards,
> Luís
>
> From: Guozhang Wang
> Sent: 23 April 2018 19:32
> To: dev@kafka.apache.org
> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
> Hi Luis,
>
> I think by "generalizing it" we could go beyond numerical values, and
> that's why I suggested we do not need to require that the type serialized
> to the bytes have any numerical semantics since it has to ben serialized to
> a byte array anyways. I understand that for your use case, the intended
> record header compaction value is a number, but imagine if someone else
> wants to compact the same-keyed messages based on some record header
> key-value pair whose value types before serializing to bytes are not
> numbers at all, but just some strings:
>
> key: "A", value: "a1", header: ["bar" -> "a".bytes()],
> key: "A", value: "a2", header: ["bar" -> "c".bytes()],
> key: "A", value: "a3", header: ["bar" -> "b".bytes()],
>
>
> Could we allow them to use that header for compaction as well?
>
>
> Now going back to your use case, for numbers that could be negative values,
> as long as users are aware of the requirement and change the default
> encoding schemes when they generate the producer record while setting the
> headers so that the serialized bytes still obey the value that should be OK
> (again, as I said, we push this responsibility to users to define the right
> serde mechanism, but that seems to be more flexible). For example: -INF
> serialized to 0x, -INF+1 serialized to 0x0001, etc.
>
>
>
> Guozhang
>
>
>
>
>
> On Mon, Apr 23, 2018 at 10:19 AM, Luís Cabral
>  > wrote:
>
> > Hello Guozhang,
> >
> > Thanks for the fast reply!
> >
> > As for the matter of the timestamp, it’s now added to the KIP, so I hope
> > this is correctly addressed.
> > Kindly let me know if you would like some adaptions to the concept.
> >
> >
> > bq. The issue that I do not understand completely is why you'd keep
> saying
> > that why we need to convert it to a String, first then converting to any
> > other fields.
> >
> > Maybe I’m over-engineering it again, and the problem can be simplified to
> > restricting this to values greater than or equal to zero, which ends up
> > being ok for my own use case...
> > This would then generally guarantee the lexicographic ordering, as you
> say.
> > Is this what you mean? Should I then add this restriction to the KIP?
> >
> > Cheers,
> > Luis
> >
> > From: Guozhang Wang
> > Sent: 23 April 2018 17:55
> > To: dev@kafka.apache.org
> > Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > Hello Luis,
> >
> > Thanks for your email, replying to your points in the following:
> >
> > > I don't personally see advantages in it, but also the only disadvantage
> > that I can think of is putting multiple meanings on this field.
> >
> > If we do not treat timestamp as a special value of the config, then I
> > cannot use the timestamp field of the record as the compaction value,
> since
> > we will only look into the record header other than the default offset,
> > right? Then users wanting to use the timestamp as the compaction value
> have
> > to put that timestamp into the record header with a name, which
> duplicates
> > the field unnecessary. So to me without treating it as a special value we
> > are doomed to have duplicate record field.
> >
> > > Having it this way would jeopardize my own particular use case, as I
> need
> > to have an incremental number representing the version (i.e.: 1, 2, 3, 5,
> > 52, et cetera)
> >
> > The issue that I do not understand completely is why you'd keep saying
> that
> > why we need to convert it to a String, first then converting to any other
> > fields. Since the header is organized in:
> >
> > public interface Header {
> >
> > String key();
> >
> > byte[] value();
> >
> > }
> >
> >
> > Which means that the header value can be of any types. So with your use
> > case why can't you just serialize your incremental version number into a
> > byte array directly, whose lexico-order obeys the version number value??
> I
> > think the default byte serialization mechanism of the integer is
> suffic

Jenkins build is back to normal : kafka-trunk-jdk7 #3366

2018-04-23 Thread Apache Jenkins Server
See 




Can I get permission to create KIP?

2018-04-23 Thread qingjun wu
Hi Dear,

I found something that Kafka should improve. Can I get permission for this?
I am working for Microsoft actually. If you need me to login as Microsoft,
I can do that too. But please advice.

My Login ID: WadeWu
My Login Email: wuqing...@gmail.com

Thank you!

-- 
Best Regards
 吴清俊|Wade Wu


Re: Can I get permission to create KIP?

2018-04-23 Thread Guozhang Wang
Hello Qingjun,

I have added you to the Kafka wiki space.

Cheers,
Guozhang


On Mon, Apr 23, 2018 at 4:21 PM, qingjun wu  wrote:

> Hi Dear,
>
> I found something that Kafka should improve. Can I get permission for this?
> I am working for Microsoft actually. If you need me to login as Microsoft,
> I can do that too. But please advice.
>
> My Login ID: WadeWu
> My Login Email: wuqing...@gmail.com
>
> Thank you!
>
> --
> Best Regards
>  吴清俊|Wade Wu
>



-- 
-- Guozhang


[jira] [Created] (KAFKA-6821) The producer attempted to use a producer id which is not currently assigned to its transactional id

2018-04-23 Thread RandySun (JIRA)
RandySun created KAFKA-6821:
---

 Summary: The producer attempted to use a producer id which is not 
currently assigned to its transactional id 
 Key: KAFKA-6821
 URL: https://issues.apache.org/jira/browse/KAFKA-6821
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
 Environment: Centos 7
Reporter: RandySun


I use Kafka Stream to join to KStream,however, I found an error stack trace in 
my application log as below:
{code:java}
Aborting producer batches due to fatal error 
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id 
at 
org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037)
 
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905)
 
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id 
{code}
 

There is no evidence showing any other error in my application, and the kafka 
controller.log saying

 

 
{code:java}
[2018-04-17 17:54:33,212] DEBUG [Controller id=2] Preferred replicas by broker 
Map(2 -> Map(__consumer_offsets-19 -> Vector(2, 3, 1), 
com.randy.Demo1-KSTREAM-FLATMAP-01-repartition-1 -> Vector(2)

[2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
replica Map() (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
broker 2 is 0.0 (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
replica Map() (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
replica Map() (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
broker 3 is 0.0 (kafka.controller.KafkaController)
{code}
 

 

 

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)