Build failed in Jenkins: kafka-trunk-jdk7 #2256

2017-05-24 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-5273: Make KafkaConsumer.committed query the server for all

--
[...truncated 898.98 KB...]
kafka.log.ProducerStateManagerTest > 
testNonTransactionalAppendWithOngoingTransaction PASSED

kafka.log.ProducerStateManagerTest > testSkipSnapshotIfOffsetUnchanged STARTED

kafka.log.ProducerStateManagerTest > testSkipSnapshotIfOffsetUnchanged PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.LogCleanerIntegrationTest > 
testCleansCombinedCompactAndDeleteTopic[0] STARTED

kafka.log.LogCleanerIntegrationTest > 
testCleansCombinedCompactAndDeleteTopic[0] PASSED

kafka.log.LogCleanerIntegrationTest > 
testCleaningNestedMessagesWithMultipleVersions[0] STARTED

kafka.log.LogCleanerIntegrationTest > 
testCleaningNestedMessagesWithMultipleVersions[0] PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest[0] STARTED

kafka.log.LogCleanerIntegrationTest > cleanerTest[0] PASSED

kafka.log.LogCleanerIntegrationTest > testCleanerWithMessageFormatV0[0] STARTED

kafka.log.LogCleanerIntegrationTest > testCleanerWithMessageFormatV0[0] PASSED

kafka.log.LogCleanerIntegrationTest > 
testCleansCombinedCompactAndDeleteTopic[1] STARTED

kafka.log.LogCleanerIntegrationTest > 
testCleansCombinedCompactAndDeleteTopic[1] PASSED

kafka.log.LogCleanerIntegrationTest > 
testCleaningNestedMessagesWithMultipleVersions[1] STARTED

kafka.log.LogCleanerIntegrationTest > 
testCleaningNestedMessagesWithMultipleVersions[1] PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest[1] STARTED

kafka.log.LogCleanerIntegrationTest > cleanerTest[1] PASSED

kafka.log.LogCleanerIntegrationTest > testCleanerWithMessageFormatV0[1] STARTED

kafka.log.LogCleanerIntegrationTest > testCleanerWithMessageFormatV0[1] PASSED

kafka.log.LogCleanerIntegrationTest > 
testCleansCombinedCompactAndDeleteTopic[2] STARTED

kafka.log.LogCleanerIntegrationTest > 
testCleansCombinedCompactAndDeleteTopic[2] PASSED

kafka.log.LogCleanerIntegrationTest > 
testCleaningNestedMessagesWithMultipleVersions[2] STARTED

kafka.log.LogClea

[GitHub] kafka pull request #3133: MINOR: GroupCoordinator can append with group lock

2017-05-24 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/3133

MINOR: GroupCoordinator can append with group lock



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka 
minor-replica-manager-append-refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3133






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [Vote] KIP-150 - Kafka-Streams Cogroup

2017-05-24 Thread Damian Guy
+1 to what Xavier said

On Wed, 24 May 2017 at 06:45 Xavier Léauté  wrote:

> I don't think we should wait for entries from each stream, since that might
> limit the usefulness of the cogroup operator. There are instances where it
> can be useful to compute something based on data from one or more stream,
> without having to wait for all the streams to produce something for the
> group. In the example I gave in the discussion, it is possible to compute
> impression/auction statistics without having to wait for click data, which
> can typically arrive several minutes late.
>
> We could have a separate discussion around adding inner / outer modifiers
> to each of the streams to decide which fields are optional / required
> before sending updates if we think that might be useful.
>
>
>
> On Tue, May 23, 2017 at 6:28 PM Guozhang Wang  wrote:
>
> > The proposal LGTM, +1
> >
> > One question I have is about when to send the record to the resulted
> KTable
> > changelog. For example in your code snippet in the wiki page, before you
> > see the end result of
> >
> > 1L, Customer[
> >
> >   cart:{Item[no:01], Item[no:03], Item[no:04]},
> >   purchases:{Item[no:07], Item[no:08]},
> >   wishList:{Item[no:11]}
> >   ]
> >
> >
> > You will firs see
> >
> > 1L, Customer[
> >
> >   cart:{Item[no:01]},
> >   purchases:{},
> >   wishList:{}
> >   ]
> >
> > 1L, Customer[
> >
> >   cart:{Item[no:01]},
> >   purchases:{Item[no:07],Item[no:08]},
> >
> >   wishList:{}
> >   ]
> >
> > 1L, Customer[
> >
> >   cart:{Item[no:01]},
> >   purchases:{Item[no:07],Item[no:08]},
> >
> >   wishList:{}
> >   ]
> >
> > ...
> >
> >
> > I'm wondering if it makes more sense to only start sending the update if
> > the corresponding agg-key has seen at least one input from each of the
> > input stream? Maybe it is out of the scope of this KIP and we can make
> it a
> > more general discussion in a separate one.
> >
> >
> > Guozhang
> >
> >
> > On Fri, May 19, 2017 at 8:37 AM, Xavier Léauté 
> > wrote:
> >
> > > Hi Kyle, I left a few more comments in the discussion thread, if you
> > > wouldn't mind taking a look
> > >
> > > On Fri, May 19, 2017 at 5:31 AM Kyle Winkelman <
> winkelman.k...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hello all,
> > > >
> > > > I would like to start the vote on KIP-150.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+
> > > Kafka-Streams+Cogroup
> > > >
> > > > Thanks,
> > > > Kyle
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [Vote] KIP-150 - Kafka-Streams Cogroup

2017-05-24 Thread Damian Guy
Also, +1 for the KIP

On Wed, 24 May 2017 at 08:57 Damian Guy  wrote:

> +1 to what Xavier said
>
> On Wed, 24 May 2017 at 06:45 Xavier Léauté  wrote:
>
>> I don't think we should wait for entries from each stream, since that
>> might
>> limit the usefulness of the cogroup operator. There are instances where it
>> can be useful to compute something based on data from one or more stream,
>> without having to wait for all the streams to produce something for the
>> group. In the example I gave in the discussion, it is possible to compute
>> impression/auction statistics without having to wait for click data, which
>> can typically arrive several minutes late.
>>
>> We could have a separate discussion around adding inner / outer modifiers
>> to each of the streams to decide which fields are optional / required
>> before sending updates if we think that might be useful.
>>
>>
>>
>> On Tue, May 23, 2017 at 6:28 PM Guozhang Wang  wrote:
>>
>> > The proposal LGTM, +1
>> >
>> > One question I have is about when to send the record to the resulted
>> KTable
>> > changelog. For example in your code snippet in the wiki page, before you
>> > see the end result of
>> >
>> > 1L, Customer[
>> >
>> >   cart:{Item[no:01], Item[no:03], Item[no:04]},
>> >   purchases:{Item[no:07], Item[no:08]},
>> >   wishList:{Item[no:11]}
>> >   ]
>> >
>> >
>> > You will firs see
>> >
>> > 1L, Customer[
>> >
>> >   cart:{Item[no:01]},
>> >   purchases:{},
>> >   wishList:{}
>> >   ]
>> >
>> > 1L, Customer[
>> >
>> >   cart:{Item[no:01]},
>> >   purchases:{Item[no:07],Item[no:08]},
>> >
>> >   wishList:{}
>> >   ]
>> >
>> > 1L, Customer[
>> >
>> >   cart:{Item[no:01]},
>> >   purchases:{Item[no:07],Item[no:08]},
>> >
>> >   wishList:{}
>> >   ]
>> >
>> > ...
>> >
>> >
>> > I'm wondering if it makes more sense to only start sending the update if
>> > the corresponding agg-key has seen at least one input from each of the
>> > input stream? Maybe it is out of the scope of this KIP and we can make
>> it a
>> > more general discussion in a separate one.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Fri, May 19, 2017 at 8:37 AM, Xavier Léauté 
>> > wrote:
>> >
>> > > Hi Kyle, I left a few more comments in the discussion thread, if you
>> > > wouldn't mind taking a look
>> > >
>> > > On Fri, May 19, 2017 at 5:31 AM Kyle Winkelman <
>> winkelman.k...@gmail.com
>> > >
>> > > wrote:
>> > >
>> > > > Hello all,
>> > > >
>> > > > I would like to start the vote on KIP-150.
>> > > >
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+
>> > > Kafka-Streams+Cogroup
>> > > >
>> > > > Thanks,
>> > > > Kyle
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>


Jenkins build is back to normal : kafka-trunk-jdk7 #2257

2017-05-24 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #3134: Kafka-5315: should not subtract for empty key is K...

2017-05-24 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/3134

Kafka-5315: should not subtract for empty key is KTable.aggregate/reduce



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
kafka-5315-fix-at-least-once-violation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3134.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3134


commit e7d26bc45c284c71b8930bc4baba11ee64e9c08f
Author: Matthias J. Sax 
Date:   2017-05-24T07:59:07Z

Kafka-5315: should not subtract for empty key is KTable.aggregate/reduce




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : kafka-trunk-jdk8 #1583

2017-05-24 Thread Apache Jenkins Server
See 




[jira] [Updated] (KAFKA-5315) Streams exception w/ partially processed record corrupts state store

2017-05-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5315:
---
Status: Patch Available  (was: Open)

PR: https://github.com/apache/kafka/pull/3134

> Streams exception w/ partially processed record corrupts state store
> 
>
> Key: KAFKA-5315
> URL: https://issues.apache.org/jira/browse/KAFKA-5315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Mathieu Fenniak
>Assignee: Matthias J. Sax
>
> When processing a topic record in a Kafka Streams KTable, the record is 
> inserted into the state store before the being forwarded to downstream 
> processors, and may persist in the state store even if downstream processing 
> fails due to an exception.  The persisted state store record may later affect 
> any attempt to restart processing after the exception.
> Specific example series of events in a simple topology: a single KTable 
> source, group by a field in the value, aggregate that adds up another field, 
> output to a topic --
> 1. A new record (record A) is received by the source KTable, and put in the 
> KTable RocksDB state store.
> 2. While processing record A, an exception happens preventing producing to 
> Kafka. (eg, a TimeoutException Failed to
> update metadata after 6 ms).
> 3. The stream thread throws an unhandled exception and stops.
> 4. The state stores are closed and flushed.  Record A is now in the local 
> state store.
> 5. The consumer group rebalances.
> 6. A different thread, in the same process, on the same host, picks up the 
> task.
> 7. New thread initializes its state store for the KTable, but it's on the 
> same host as the original thread, so it still contains the k/v for record A.
> 8. New thread resumes consuming at the last committed offset, which is before 
> record A.
> 9. When processing record A, the new thread reads the value that was written 
> to the state store in step #1 by record A's key.
> 10. The repartition map receives a Change with both an oldValue and a
> newValue, and forwards a Change(null, v) and Change(v, null)
> 11. The aggregation ends up both subtracting and adding the value of record 
> A, resulting in an incorrect & persistent output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3021: KAFKA-5006: change exception path

2017-05-24 Thread enothereska
Github user enothereska closed the pull request at:

https://github.com/apache/kafka/pull/3021


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5006) KeyValueStore.put may throw exception unrelated to the current put attempt

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022570#comment-16022570
 ] 

ASF GitHub Bot commented on KAFKA-5006:
---

Github user enothereska closed the pull request at:

https://github.com/apache/kafka/pull/3021


> KeyValueStore.put may throw exception unrelated to the current put attempt
> --
>
> Key: KAFKA-5006
> URL: https://issues.apache.org/jira/browse/KAFKA-5006
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.1.0, 0.10.2.0
>Reporter: Xavier Léauté
>Assignee: Eno Thereska
>Priority: Blocker
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> It is possible for {{KeyValueStore.put(K key, V value)}} to throw an 
> exception unrelated to the store in question. Due to [the way that 
> {{RecordCollector.send}} is currently 
> implemented|https://github.com/confluentinc/kafka/blob/3.2.x/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L76]
> the exception thrown would be for any previous record produced by the stream 
> task, possibly for a completely unrelated topic the same task is producing to.
> This can be very confusing for someone attempting to correctly handle 
> exceptions thrown by put(), as they would not be able to add any additional 
> debugging information to understand the operation that caused the problem. 
> Worse, such logging would likely confuse the user, since they might mislead 
> themselves into thinking the changelog record created by calling put() caused 
> the problem.
> Given that there is likely no way for the user to recover from an exception 
> thrown by an unrelated produce request, it is questionable whether we should 
> even try to raise the exception at this level. A short-term fix would be to 
> simply delegate this exception to the uncaught exception handler.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5006) KeyValueStore.put may throw exception unrelated to the current put attempt

2017-05-24 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-5006:

Resolution: Fixed
Status: Resolved  (was: Patch Available)

> KeyValueStore.put may throw exception unrelated to the current put attempt
> --
>
> Key: KAFKA-5006
> URL: https://issues.apache.org/jira/browse/KAFKA-5006
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.1.0, 0.10.2.0
>Reporter: Xavier Léauté
>Assignee: Eno Thereska
>Priority: Blocker
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> It is possible for {{KeyValueStore.put(K key, V value)}} to throw an 
> exception unrelated to the store in question. Due to [the way that 
> {{RecordCollector.send}} is currently 
> implemented|https://github.com/confluentinc/kafka/blob/3.2.x/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L76]
> the exception thrown would be for any previous record produced by the stream 
> task, possibly for a completely unrelated topic the same task is producing to.
> This can be very confusing for someone attempting to correctly handle 
> exceptions thrown by put(), as they would not be able to add any additional 
> debugging information to understand the operation that caused the problem. 
> Worse, such logging would likely confuse the user, since they might mislead 
> themselves into thinking the changelog record created by calling put() caused 
> the problem.
> Given that there is likely no way for the user to recover from an exception 
> thrown by an unrelated produce request, it is questionable whether we should 
> even try to raise the exception at this level. A short-term fix would be to 
> simply delegate this exception to the uncaught exception handler.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3135: KAFKA-5314: exception handling and cleanup for sta...

2017-05-24 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/3135

KAFKA-5314: exception handling and cleanup for state stores



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka exceptions-stores-KAFKA-5314

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3135.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3135


commit 7be5deb1be681e7b6c2bae328379a1540d237bc9
Author: Eno Thereska 
Date:   2017-05-23T17:02:13Z

Cleanup and fix pass I

commit 7a4e18a7ebfd1206433ea89c1879cc7c2afd2b83
Author: Eno Thereska 
Date:   2017-05-23T17:55:51Z

Caching tests

commit c6f57b29d712916149a4264aab27973e764c83b5
Author: Eno Thereska 
Date:   2017-05-24T09:02:48Z

Various fixes and cleanup




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5314) Improve exception handling for state stores

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022574#comment-16022574
 ] 

ASF GitHub Bot commented on KAFKA-5314:
---

GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/3135

KAFKA-5314: exception handling and cleanup for state stores



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka exceptions-stores-KAFKA-5314

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3135.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3135


commit 7be5deb1be681e7b6c2bae328379a1540d237bc9
Author: Eno Thereska 
Date:   2017-05-23T17:02:13Z

Cleanup and fix pass I

commit 7a4e18a7ebfd1206433ea89c1879cc7c2afd2b83
Author: Eno Thereska 
Date:   2017-05-23T17:55:51Z

Caching tests

commit c6f57b29d712916149a4264aab27973e764c83b5
Author: Eno Thereska 
Date:   2017-05-24T09:02:48Z

Various fixes and cleanup




> Improve exception handling for state stores
> ---
>
> Key: KAFKA-5314
> URL: https://issues.apache.org/jira/browse/KAFKA-5314
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.0.0
>
>
> RocksDbStore.java throws a mix of exceptions like StreamsException and 
> ProcessorStateException. That needs to be made consistent. 
> Also the exceptions thrown are not documented in the KeyValueStore interface. 
> All the stores (RocksDb, InMemory) need to have consistent exception handling.
> Today a store error is fatal and halts the stream thread that is processing 
> the exceptions. We could explore alternatives, like haling part of the 
> topology that passes through that faulty store, i.e., failing tasks, not the 
> entire thread.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3127: Add sleep between empty polls to avoid burning CPU

2017-05-24 Thread felixgborrego
Github user felixgborrego closed the pull request at:

https://github.com/apache/kafka/pull/3127


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022604#comment-16022604
 ] 

ASF GitHub Bot commented on KAFKA-3159:
---

Github user felixgborrego closed the pull request at:

https://github.com/apache/kafka/pull/3127


> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usa

[jira] [Created] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Ma Tianchi (JIRA)
Ma Tianchi created KAFKA-5319:
-

 Summary: Add a tool to make cluster replica and leader balance
 Key: KAFKA-5319
 URL: https://issues.apache.org/jira/browse/KAFKA-5319
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Affects Versions: 0.10.2.1
Reporter: Ma Tianchi
 Attachments: ClusterBalanceCommand.scala

When a new broker is added to cluster,there is not any topics in the new 
broker.When we use console command to create a topic without 
'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
replicaAssignment.Even though it is balance at the creating time if the cluster 
never change,with more and more brokers added to cluster the replica balanced 
will become not well. We also can use 'kafka-reassign-partitions.sh' to balance 
,but the big amount of topics make it not easy.And at the topic created time , 
Kafka choose a  PreferredReplicaLeader which be put at the first position of  
the AR to make leader balance.But the balance will be destroyed when cluster 
changed.Using  'kafka-reassign-partitions.sh' to make partition reassignment 
may be also destroy the leader balance ,because user can change the AR of the 
partition . It may be not balanced , but Kafka believe cluster leader balance 
is well with every leaders is the first on at AR.
So we create a tool to make the number of replicas and number of leaders on 
every brokers is balanced.It uses a algorithm to get a balanced replica and 
leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
cluster.
It can be used to make balance when cluster added brokers or cluster is not 
balanced .It does not deal with moving replicas of a dead broker to an alive 
broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3136: KAFKA-5319 Add a tool to make cluster replica and ...

2017-05-24 Thread MarkTcMA
GitHub user MarkTcMA opened a pull request:

https://github.com/apache/kafka/pull/3136

KAFKA-5319 Add a tool to make cluster replica and leader balance

As [KAFKA-5319](https://issues.apache.org/jira/browse/KAFKA-5319) describes 
Kafka does not have tools to make replica or leader num of cluster balanced ,I 
write a tool to deal with it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MarkTcMA/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3136


commit de71981cf6ca42f39ed1fc44ef34f698afcf6e60
Author: 马天池 
Date:   2017-05-24T02:49:32Z

add a new command which make replica and leader on broker balanced

commit 19a455e4eb38d6f992466901c0466e2aa8aaa36f
Author: 马天池 
Date:   2017-05-24T06:31:32Z

no message

commit fec1f099bce34158a18062e9f6d6245863c6d2e5
Author: 马天池 
Date:   2017-05-24T07:55:26Z

no message

commit 6af378a777c450193c279a3b8b995bfccea2a55f
Author: Ma Tianchi 
Date:   2017-05-24T08:03:16Z

a command to make replica and leader balanced

commit 6050af726f30ffbb3e7281e135bbdb2a57e94c80
Author: Ma Tianchi 
Date:   2017-05-24T10:04:18Z

Add a tool to make cluster replica and leader balance




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022687#comment-16022687
 ] 

ASF GitHub Bot commented on KAFKA-5319:
---

GitHub user MarkTcMA opened a pull request:

https://github.com/apache/kafka/pull/3136

KAFKA-5319 Add a tool to make cluster replica and leader balance

As [KAFKA-5319](https://issues.apache.org/jira/browse/KAFKA-5319) describes 
Kafka does not have tools to make replica or leader num of cluster balanced ,I 
write a tool to deal with it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MarkTcMA/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3136


commit de71981cf6ca42f39ed1fc44ef34f698afcf6e60
Author: 马天池 
Date:   2017-05-24T02:49:32Z

add a new command which make replica and leader on broker balanced

commit 19a455e4eb38d6f992466901c0466e2aa8aaa36f
Author: 马天池 
Date:   2017-05-24T06:31:32Z

no message

commit fec1f099bce34158a18062e9f6d6245863c6d2e5
Author: 马天池 
Date:   2017-05-24T07:55:26Z

no message

commit 6af378a777c450193c279a3b8b995bfccea2a55f
Author: Ma Tianchi 
Date:   2017-05-24T08:03:16Z

a command to make replica and leader balanced

commit 6050af726f30ffbb3e7281e135bbdb2a57e94c80
Author: Ma Tianchi 
Date:   2017-05-24T10:04:18Z

Add a tool to make cluster replica and leader balance




> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
> Attachments: ClusterBalanceCommand.scala
>
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Ma Tianchi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ma Tianchi resolved KAFKA-5319.
---
Resolution: Resolved

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
> Attachments: ClusterBalanceCommand.scala
>
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2017-05-24 Thread Kyle Winkelman
I allow defining a single window/sessionwindow one time when you make the
cogroup call from a KGroupedStream. From then on you are using the cogroup
call from with in CogroupedKStream which doesnt accept any additional
windows/sessionwindows.

Is this what you meant by your question or did I misunderstand?

On May 23, 2017 9:33 PM, "Guozhang Wang"  wrote:

Another question that came to me is on "window alignment": from the KIP it
seems you are allowing users to specify a (potentially different) window
spec in each co-grouped input stream. So if these window specs are
different how should we "align" them with different input streams? I think
it is more natural to only specify on window spec in the

KTable CogroupedKStream#aggregate(Windows);


And remove it from the cogroup() functions. WDYT?


Guozhang

On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang  wrote:

> Thanks for the proposal Kyle, this is a quite common use case to support
> such multi-way table join (i.e. N source tables with N aggregate func)
with
> a single store and N+1 serdes, I have seen lots of people using the
> low-level PAPI to achieve this goal.
>
>
> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman  > wrote:
>
>> I like your point about not handling other cases such as count and
reduce.
>>
>> I think that reduce may not make sense because reduce assumes that the
>> input values are the same as the output values. With cogroup there may be
>> multiple different input types and then your output type cant be multiple
>> different things. In the case where you have all matching value types you
>> can do KStreamBuilder#merge followed by the reduce.
>>
>> As for count I think it is possible to call count on all the individual
>> grouped streams and then do joins. Otherwise we could maybe make a
special
>> call in groupedstream for this case. Because in this case we dont need to
>> do type checking on the values. It could be similar to the current count
>> methods but accept a var args of additonal grouped streams as well and
>> make
>> sure they have a key type of K.
>>
>> The way I have put the kip together is to ensure that we do type
checking.
>> I don't see a way we could group them all first and then make a call to
>> count, reduce, or aggregate because with aggregate they would need to
pass
>> a list of aggregators and we would have no way of type checking that they
>> match the grouped streams.
>>
>> Thanks,
>> Kyle
>>
>> On May 19, 2017 11:42 AM, "Xavier Léauté"  wrote:
>>
>> > Sorry to jump on this thread so late. I agree this is a very useful
>> > addition and wanted to provide an additional use-case and some more
>> > comments.
>> >
>> > This is actually a very common analytics use-case in the ad-tech
>> industry.
>> > The typical setup will have an auction stream, an impression stream,
>> and a
>> > click stream. Those three streams need to be combined to compute
>> aggregate
>> > statistics (e.g. impression statistics, and click-through rates), since
>> > most of the attributes of interest are only present the auction stream.
>> >
>> > A simple way to do this is to co-group all the streams by the auction
>> key,
>> > and process updates to the co-group as events for each stream come in,
>> > keeping only one value from each stream before sending downstream for
>> > further processing / aggregation.
>> >
>> > One could view the result of that co-group operation as a "KTable" with
>> > multiple values per key. The key being the grouping key, and the values
>> > consisting of one value per stream.
>> >
>> > What I like about Kyle's approach is that allows elegant co-grouping of
>> > multiple streams without having to worry about the number of streams,
>> and
>> > avoids dealing with Tuple types or other generic interfaces that could
>> get
>> > messy if we wanted to preserve all the value types in the resulting
>> > co-grouped stream.
>> >
>> > My only concern is that we only allow the cogroup + aggregate combined
>> > operation. This forces the user to build their own tuple serialization
>> > format if they want to preserve the individual input stream values as a
>> > group. It also deviates quite a bit from our approach in KGroupedStream
>> > which offers other operations, such as count and reduce, which should
>> also
>> > be applicable to a co-grouped stream.
>> >
>> > Overall I still think this is a really useful addition, but I feel we
>> > haven't spend much time trying to explore alternative DSLs that could
>> maybe
>> > generalize better or match our existing syntax more closely.
>> >
>> > On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman > >
>> > wrote:
>> >
>> > > Eno, is there anyone else that is an expert in the kafka streams
realm
>> > that
>> > > I should reach out to for input?
>> > >
>> > > I believe Damian Guy is still planning on reviewing this more in
depth
>> > so I
>> > > will wait for his inputs before continuing.
>> > >
>> > > On May 9, 2017 7:30 AM, "Eno Thereska" 
>> wrote:
>> > >
>> > > > Thanks Kyle, good arg

[jira] [Commented] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022695#comment-16022695
 ] 

Michal Borowiecki commented on KAFKA-5319:
--

[~markTC], shouldn't this be in "Patch Available" instead of "Resolved" status 
until the PR is merged?

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
> Attachments: ClusterBalanceCommand.scala
>
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5320) Update produce/fetch throttle time metrics for any request throttle

2017-05-24 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-5320:
-

 Summary: Update produce/fetch throttle time metrics for any 
request throttle
 Key: KAFKA-5320
 URL: https://issues.apache.org/jira/browse/KAFKA-5320
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 0.11.0.0


>From KIP-124:

{quote}
Producers and consumers currently expose average and maximum producer/fetch 
request throttle time as JMX metrics. These metrics will be updated to reflect 
total throttle time for the producer or consumer including byte-rate throttling 
and request time throttling for all requests of the producer/consumer. 
{quote}

Missed this during the request quota implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Reopened] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Ma Tianchi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ma Tianchi reopened KAFKA-5319:
---

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
> Attachments: ClusterBalanceCommand.scala
>
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5320) Update produce/fetch throttle time metrics for any request throttle

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022728#comment-16022728
 ] 

ASF GitHub Bot commented on KAFKA-5320:
---

GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/3137

KAFKA-5320: Include all request throttling in client throttle metrics



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-5320

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3137


commit df83a196def521e50939f39d62bccd4e461a
Author: Rajini Sivaram 
Date:   2017-05-24T11:06:05Z

KAFKA-5320: Include all request throttling in client throttle metrics




> Update produce/fetch throttle time metrics for any request throttle
> ---
>
> Key: KAFKA-5320
> URL: https://issues.apache.org/jira/browse/KAFKA-5320
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.11.0.0
>
>
> From KIP-124:
> {quote}
> Producers and consumers currently expose average and maximum producer/fetch 
> request throttle time as JMX metrics. These metrics will be updated to 
> reflect total throttle time for the producer or consumer including byte-rate 
> throttling and request time throttling for all requests of the 
> producer/consumer. 
> {quote}
> Missed this during the request quota implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3137: KAFKA-5320: Include all request throttling in clie...

2017-05-24 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/3137

KAFKA-5320: Include all request throttling in client throttle metrics



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-5320

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3137


commit df83a196def521e50939f39d62bccd4e461a
Author: Rajini Sivaram 
Date:   2017-05-24T11:06:05Z

KAFKA-5320: Include all request throttling in client throttle metrics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Ma Tianchi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ma Tianchi updated KAFKA-5319:
--
Attachment: (was: ClusterBalanceCommand.scala)

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Ma Tianchi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ma Tianchi updated KAFKA-5319:
--
Status: Reopened  (was: Reopened)

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop

2017-05-24 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022828#comment-16022828
 ] 

Eno Thereska commented on KAFKA-4740:
-

I mentioned in KAFKA-5211 that such a change should be done with a KIP. 

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> ---
>
> Key: KAFKA-4740
> URL: https://issues.apache.org/jira/browse/KAFKA-4740
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Sébastien Launay
>Assignee: Sébastien Launay
>Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
> kafkaConsumer.subscribe(Arrays.asList("topic"));
> // Will run till the shutdown hook is called
> while (!doStop) {
> try {
> ConsumerRecords records = 
> kafkaConsumer.poll(1000);
> if (!records.isEmpty()) {
> logger.info("Got {} messages", records.count());
> for (ConsumerRecord record : records) {
> logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
> record.partition(), record.offset(), record.key(), 
> record.value());
> }
> } else {
> logger.info("No messages to consume");
> }
> } catch (SerializationException e) {
> logger.warn("Failed polling some records", e);
> }
>  }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
> printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by Integer

[jira] [Commented] (KAFKA-5211) KafkaConsumer should not skip a corrupted record after throwing an exception.

2017-05-24 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022824#comment-16022824
 ] 

Eno Thereska commented on KAFKA-5211:
-

[~becket_qin] I think this change needs a KIP. There have been several JIRAs 
like KAFKA-5078, KAFKA-4740 etc that have a lot of discussion on the PR. I 
personally haven't understood the ramifications of these changes. They affect 
users since they need to know what to do in case bad data is in the pipeline. 
Could you prepare a KIP on this? Thanks.

> KafkaConsumer should not skip a corrupted record after throwing an exception.
> -
>
> Key: KAFKA-5211
> URL: https://issues.apache.org/jira/browse/KAFKA-5211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>  Labels: clients, consumer
> Fix For: 0.11.0.0
>
>
> In 0.10.2, when there is a corrupted record, KafkaConsumer.poll() will throw 
> an exception and block on that corrupted record. In the latest trunk this 
> behavior has changed to skip the corrupted record (which is the old consumer 
> behavior). With KIP-98, skipping corrupted messages would be a little 
> dangerous as the message could be a control message for a transaction. We 
> should fix the issue to let the KafkaConsumer block on the corrupted messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4857) Use AdminClient in Kafka Streams

2017-05-24 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4857:

Summary: Use AdminClient in Kafka Streams  (was: Improve Client handling)

> Use AdminClient in Kafka Streams
> 
>
> Key: KAFKA-4857
> URL: https://issues.apache.org/jira/browse/KAFKA-4857
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sharad
>  Labels: needs-kip
>
> Streams uses {{KafkaClientSupplier}} to get 
> consumer/restore-consumer/producer clients. Streams also uses one more client 
> for admin purpose namely {{StreamsKafkaClient}} that is instantiated 
> "manually".
> With the newly upcoming {{AdminClient}} from KIP-117, we can simplify (or 
> even replace {{StreamsKafkaClient}} with the new {{AdminClient}}. We 
> furthermore want to unify how the client in generated and extend 
> {{KafkaClientSupplier}} with method that return this client.
> As this is a public API change, a KIP is required.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [Vote] KIP-150 - Kafka-Streams Cogroup

2017-05-24 Thread Bill Bejeck
+1  for the KIP and +1 what Xavier said as well.

On Wed, May 24, 2017 at 3:57 AM, Damian Guy  wrote:

> Also, +1 for the KIP
>
> On Wed, 24 May 2017 at 08:57 Damian Guy  wrote:
>
> > +1 to what Xavier said
> >
> > On Wed, 24 May 2017 at 06:45 Xavier Léauté  wrote:
> >
> >> I don't think we should wait for entries from each stream, since that
> >> might
> >> limit the usefulness of the cogroup operator. There are instances where
> it
> >> can be useful to compute something based on data from one or more
> stream,
> >> without having to wait for all the streams to produce something for the
> >> group. In the example I gave in the discussion, it is possible to
> compute
> >> impression/auction statistics without having to wait for click data,
> which
> >> can typically arrive several minutes late.
> >>
> >> We could have a separate discussion around adding inner / outer
> modifiers
> >> to each of the streams to decide which fields are optional / required
> >> before sending updates if we think that might be useful.
> >>
> >>
> >>
> >> On Tue, May 23, 2017 at 6:28 PM Guozhang Wang 
> wrote:
> >>
> >> > The proposal LGTM, +1
> >> >
> >> > One question I have is about when to send the record to the resulted
> >> KTable
> >> > changelog. For example in your code snippet in the wiki page, before
> you
> >> > see the end result of
> >> >
> >> > 1L, Customer[
> >> >
> >> >   cart:{Item[no:01], Item[no:03], Item[no:04]},
> >> >   purchases:{Item[no:07], Item[no:08]},
> >> >   wishList:{Item[no:11]}
> >> >   ]
> >> >
> >> >
> >> > You will firs see
> >> >
> >> > 1L, Customer[
> >> >
> >> >   cart:{Item[no:01]},
> >> >   purchases:{},
> >> >   wishList:{}
> >> >   ]
> >> >
> >> > 1L, Customer[
> >> >
> >> >   cart:{Item[no:01]},
> >> >   purchases:{Item[no:07],Item[no:08]},
> >> >
> >> >   wishList:{}
> >> >   ]
> >> >
> >> > 1L, Customer[
> >> >
> >> >   cart:{Item[no:01]},
> >> >   purchases:{Item[no:07],Item[no:08]},
> >> >
> >> >   wishList:{}
> >> >   ]
> >> >
> >> > ...
> >> >
> >> >
> >> > I'm wondering if it makes more sense to only start sending the update
> if
> >> > the corresponding agg-key has seen at least one input from each of the
> >> > input stream? Maybe it is out of the scope of this KIP and we can make
> >> it a
> >> > more general discussion in a separate one.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Fri, May 19, 2017 at 8:37 AM, Xavier Léauté 
> >> > wrote:
> >> >
> >> > > Hi Kyle, I left a few more comments in the discussion thread, if you
> >> > > wouldn't mind taking a look
> >> > >
> >> > > On Fri, May 19, 2017 at 5:31 AM Kyle Winkelman <
> >> winkelman.k...@gmail.com
> >> > >
> >> > > wrote:
> >> > >
> >> > > > Hello all,
> >> > > >
> >> > > > I would like to start the vote on KIP-150.
> >> > > >
> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+
> >> > > Kafka-Streams+Cogroup
> >> > > >
> >> > > > Thanks,
> >> > > > Kyle
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
>


Kafka Read Data from All Partition Using Key or Timestamp

2017-05-24 Thread SenthilKumar K
Hi All ,  We have been using Kafka for our Use Case which helps in
delivering real time raw logs.. I have a requirement to fetch data from
Kafka by using offset ..

DataSet Example :
{"access_date":"2017-05-24
13:57:45.044","format":"json","start":"1490296463.031"}
{"access_date":"2017-05-24
13:57:46.044","format":"json","start":"1490296463.031"}
{"access_date":"2017-05-24
13:57:47.044","format":"json","start":"1490296463.031"}
{"access_date":"2017-05-24
13:58:02.042","format":"json","start":"1490296463.031"}

Above JSON data will be stored in Kafka..

Key --> acces_date in epoch format
Value --> whole JSON.

Data Access Pattern:
  1) Get me last 2 minz data ?
   2) Get me records between 2017-05-24 13:57:42:00 to 2017-05-24
13:57:44:00 ?

How to achieve this in Kafka ?

I tried using SimpleConsumer , but it expects partition and not sure
SimpleConsumer would match our requirement...

Appreciate you help !

Cheers,
Senthil


[jira] [Reopened] (KAFKA-4340) Change the default value of log.message.timestamp.difference.max.ms to the same as log.retention.ms

2017-05-24 Thread Magnus Edenhill (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Magnus Edenhill reopened KAFKA-4340:


> Change the default value of log.message.timestamp.difference.max.ms to the 
> same as log.retention.ms
> ---
>
> Key: KAFKA-4340
> URL: https://issues.apache.org/jira/browse/KAFKA-4340
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.11.0.0
>
>
> [~junrao] brought up the following scenario: 
> If users are pumping data with timestamp already passed log.retention.ms into 
> Kafka, the messages will be appended to the log but will be immediately 
> rolled out by log retention thread when it kicks in and the messages will be 
> deleted. 
> To avoid this produce-and-deleted scenario, we can set the default value of 
> log.message.timestamp.difference.max.ms to be the same as log.retention.ms.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4340) Change the default value of log.message.timestamp.difference.max.ms to the same as log.retention.ms

2017-05-24 Thread Magnus Edenhill (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022945#comment-16022945
 ] 

Magnus Edenhill commented on KAFKA-4340:


While the idea behind this JIRA is good (as a means of optimization) I think it 
might be troublesome in practice.

If a producer sends a batch of N messages, with one message being too old, the 
entire batch will fail (errors are propagated per partition, not message) and 
the producer can't really recover and retry gracefully to produce the 
non-timedout messages.

This problem is not related to a specific client, but rather the nature of the 
data being produced:
it will manifest itself with old timestamps, such as app-sourced timestamps, or 
things like MM.

A better alternative would perhaps be to silently discard the message on the 
broker instead (which is effectively the same as writing the message to log and 
then immediately cleaning it before a consumer picks up the message).

> Change the default value of log.message.timestamp.difference.max.ms to the 
> same as log.retention.ms
> ---
>
> Key: KAFKA-4340
> URL: https://issues.apache.org/jira/browse/KAFKA-4340
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.11.0.0
>
>
> [~junrao] brought up the following scenario: 
> If users are pumping data with timestamp already passed log.retention.ms into 
> Kafka, the messages will be appended to the log but will be immediately 
> rolled out by log retention thread when it kicks in and the messages will be 
> deleted. 
> To avoid this produce-and-deleted scenario, we can set the default value of 
> log.message.timestamp.difference.max.ms to be the same as log.retention.ms.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2046: MINOR: Improve the help doc of consumer group comm...

2017-05-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2046


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5315) Streams exception w/ partially processed record corrupts state store

2017-05-24 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023155#comment-16023155
 ] 

Eno Thereska commented on KAFKA-5315:
-

[~mjsax] isn't this a fundamental problem that exactly-once and transactions 
are supposed to solve? Falls into the general category of "produce to a bunch 
of topics + state store" atomically. What am I missing?

> Streams exception w/ partially processed record corrupts state store
> 
>
> Key: KAFKA-5315
> URL: https://issues.apache.org/jira/browse/KAFKA-5315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Mathieu Fenniak
>Assignee: Matthias J. Sax
>
> When processing a topic record in a Kafka Streams KTable, the record is 
> inserted into the state store before the being forwarded to downstream 
> processors, and may persist in the state store even if downstream processing 
> fails due to an exception.  The persisted state store record may later affect 
> any attempt to restart processing after the exception.
> Specific example series of events in a simple topology: a single KTable 
> source, group by a field in the value, aggregate that adds up another field, 
> output to a topic --
> 1. A new record (record A) is received by the source KTable, and put in the 
> KTable RocksDB state store.
> 2. While processing record A, an exception happens preventing producing to 
> Kafka. (eg, a TimeoutException Failed to
> update metadata after 6 ms).
> 3. The stream thread throws an unhandled exception and stops.
> 4. The state stores are closed and flushed.  Record A is now in the local 
> state store.
> 5. The consumer group rebalances.
> 6. A different thread, in the same process, on the same host, picks up the 
> task.
> 7. New thread initializes its state store for the KTable, but it's on the 
> same host as the original thread, so it still contains the k/v for record A.
> 8. New thread resumes consuming at the last committed offset, which is before 
> record A.
> 9. When processing record A, the new thread reads the value that was written 
> to the state store in step #1 by record A's key.
> 10. The repartition map receives a Change with both an oldValue and a
> newValue, and forwards a Change(null, v) and Change(v, null)
> 11. The aggregation ends up both subtracting and adding the value of record 
> A, resulting in an incorrect & persistent output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic

2017-05-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5063:
---
Status: Reopened  (was: Reopened)

> Flaky 
> ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
> -
>
> Key: KAFKA-5063
> URL: https://issues.apache.org/jira/browse/KAFKA-5063
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>  Labels: unit-test
> Fix For: 0.11.0.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError: 
> Expected: <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 4), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 3)]>
>  but: was <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic

2017-05-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5063.

Resolution: Fixed

We got a new JIRA for the new stack trace: 
https://issues.apache.org/jira/browse/KAFKA-5288

> Flaky 
> ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
> -
>
> Key: KAFKA-5063
> URL: https://issues.apache.org/jira/browse/KAFKA-5063
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>  Labels: unit-test
> Fix For: 0.11.0.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError: 
> Expected: <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 4), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 3)]>
>  but: was <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Reopened] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic

2017-05-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5063:


> Flaky 
> ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
> -
>
> Key: KAFKA-5063
> URL: https://issues.apache.org/jira/browse/KAFKA-5063
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>  Labels: unit-test
> Fix For: 0.11.0.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError: 
> Expected: <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 4), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 3)]>
>  but: was <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5315) Streams exception w/ partially processed record corrupts state store

2017-05-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023185#comment-16023185
 ] 

Matthias J. Sax commented on KAFKA-5315:


EOS will fix this automatically. But we can (and thus should) also fix this 
issue for at-least-once (at least for this special case).

> Streams exception w/ partially processed record corrupts state store
> 
>
> Key: KAFKA-5315
> URL: https://issues.apache.org/jira/browse/KAFKA-5315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Mathieu Fenniak
>Assignee: Matthias J. Sax
>
> When processing a topic record in a Kafka Streams KTable, the record is 
> inserted into the state store before the being forwarded to downstream 
> processors, and may persist in the state store even if downstream processing 
> fails due to an exception.  The persisted state store record may later affect 
> any attempt to restart processing after the exception.
> Specific example series of events in a simple topology: a single KTable 
> source, group by a field in the value, aggregate that adds up another field, 
> output to a topic --
> 1. A new record (record A) is received by the source KTable, and put in the 
> KTable RocksDB state store.
> 2. While processing record A, an exception happens preventing producing to 
> Kafka. (eg, a TimeoutException Failed to
> update metadata after 6 ms).
> 3. The stream thread throws an unhandled exception and stops.
> 4. The state stores are closed and flushed.  Record A is now in the local 
> state store.
> 5. The consumer group rebalances.
> 6. A different thread, in the same process, on the same host, picks up the 
> task.
> 7. New thread initializes its state store for the KTable, but it's on the 
> same host as the original thread, so it still contains the k/v for record A.
> 8. New thread resumes consuming at the last committed offset, which is before 
> record A.
> 9. When processing record A, the new thread reads the value that was written 
> to the state store in step #1 by record A's key.
> 10. The repartition map receives a Change with both an oldValue and a
> newValue, and forwards a Change(null, v) and Change(v, null)
> 11. The aggregation ends up both subtracting and adding the value of record 
> A, resulting in an incorrect & persistent output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5156) Options for handling exceptions in streams

2017-05-24 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-5156:

Fix Version/s: (was: 0.11.0.0)
   0.11.1.0

> Options for handling exceptions in streams
> --
>
> Key: KAFKA-5156
> URL: https://issues.apache.org/jira/browse/KAFKA-5156
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>  Labels: user-experience
> Fix For: 0.11.1.0
>
>
> This is a task around options for handling exceptions in streams. It focuses 
> around options for dealing with corrupt data (keep going, stop streams, log, 
> retry, etc).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-148: Add a connect timeout for client

2017-05-24 Thread Colin McCabe
On Tue, May 23, 2017, at 19:07, Guozhang Wang wrote:
> I think using a single config to cover end-to-end latency with connecting
> and request round-trip may not be best appropriate since 1) some request
> may need much more time than others since they are parked (fetch request
> with long polling, join group request etc) or throttled, 

Hmm.  My proposal was to implement _both_ end-to-end timeouts and
per-call timeouts.  In that case, some requests needing much more time
than others should not be a concern, since we can simply set a higher
per-call timeout on the requests we think will need more time.

> and 2) some
> requests are prerequisite of others, like group request to discover the
> coordinator before the fetch offset request, and implementation wise
> these
> request send/receive is embedded in latter ones, hence it is not clear if
> the `request.timeout.ms` should cover just a single RPC or more.

As far as I know, the request timeout has always covered a single RP  If
we want to implement a higher level timeout that spans multiple RPCs, we
can set the per-call timeouts appropriately.  For example:

> long deadline = System.currentTimeMillis() + 6;
> callA(callTimeout = deadline - System.currentTimeMillis())
> callB(callTimeout = deadline - System.currentTimeMillis())

> 
> So no matter whether we add a `connect.timeout.ms` in addition to `
> request.timeout.ms`, we should consider adding per-request-type timeout
> value, and make `request.timeout.ms` a global default; if we add the `
> connect.timeout.ms` the per-request value is only for the round trip,
> otherwise it is supposed to include the connecting time. Personally I'd
> prefer the first option to add a universal `connect.timeout.ms`, and in
> another KIP consider adding per-request-type timeout overrides.

Why have a special case for time spent connecting, though?  Why would
the user care where the time went, as long as the timeout was met?  It
feels like this is just a hack because we couldn't raise
request.timeout.ms to the value that it "should" have been at for the
shorter requests.  As someone already commented, it's confusing to have
all these knobs that we don't really need.

> 
> BTW if the consumer issue is the only cause that we are having a high
> default value, I'd suggest we separate the consumer rebalance timeout and
> not piggy-back on the session timeout. Then we can set the default `
> request.timeout.ms` to a smaller value, like 10 secs. This is orthogonal
> to
> this KIP discussion and we can continue this in a separate thread.

+1

cheers,
Colin

> 
> 
> Guozhang
> 
> On Tue, May 23, 2017 at 3:31 PM, Colin McCabe  wrote:
> 
> > Another note-- it would be really nice if timeouts were end-to-end,
> > rather than being set for particular phases of an RP  From a user point
> > of view, a 30 second timeout should mean that the call either succeeds
> > or fails after 30 seconds, regardless of how much time is spent looking
> > for metadata, connecting to brokers, waiting for brokers, etc.  This is
> > implemented in AdminClient by setting a deadline when the call is first
> > created and referring to that afterwards.
> >
> > best,
> > Colin
> >
> >
> > On Tue, May 23, 2017, at 13:18, Colin McCabe wrote:
> > > In the AdminClient, we allow setting per-call timeouts.  The global
> > > timeout is just a default.  It seems like that is really what we should
> > > do in the producer and consumer as well, rather than having a lot of
> > > special cases for timeouts in  connecting vs. other call states.  Then
> > > join requests could gave a 5 minute timeout, but other requests could
> > > gave a shorter one.  Thoughts?
> > >
> > > Cheers,
> > > Colin
> > >
> > > OnTue, May 23, 2017, at 04:27, Rajini Sivaram wrote:
> > > > Guozhang,
> > > >
> > > > At the moment we don't have a connect timeout. And the behaviour
> > > > suggested
> > > > in the KIP is useful to address this.
> > > >
> > > > We do however have a request.timeout.ms. This is the amount of time it
> > > > would take to detect a crashed broker if the broker crashed after a
> > > > connection was established. Unfortunately in the consumer, this was
> > > > increased to > 5minutes since JoinRequest can take up to
> > > > max.poll.interval.ms, which has a default of  5 minutes. Since the
> > > > whole point of this timeout is to detect a crashed broker, 5 minutes is
> > > > too
> > > > large.
> > > >
> > > > My suggestion was to use request.timeout.ms to also detect connection
> > > > timeouts to a crashed broker - implement the behavior suggested in the
> > > > KIP
> > > > without adding a new config parameter. As Ismael has said, this will
> > need
> > > > to fix request.timeout.ms in the consumer.
> > > >
> > > >
> > > > On Mon, May 22, 2017 at 1:23 PM, Simon Souter <
> > sim...@cakesolutions.net>
> > > > wrote:
> > > >
> > > > > The following tickets are probably relevant to this KIP:
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3457
> > > > > 

[GitHub] kafka pull request #3138: KAFKA-5017: Record batch first offset remains accu...

2017-05-24 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/3138

KAFKA-5017: Record batch first offset remains accurate after compaction



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-5017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3138.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3138


commit c24ba61c27abb12d86705b9f0c7e322e24cdf692
Author: Jason Gustafson 
Date:   2017-05-24T18:37:26Z

KAFKA-5017: Record batch first offset remains accurate after compaction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5017) Consider making baseOffset the first offset in message format v2

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023426#comment-16023426
 ] 

ASF GitHub Bot commented on KAFKA-5017:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/3138

KAFKA-5017: Record batch first offset remains accurate after compaction



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-5017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3138.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3138


commit c24ba61c27abb12d86705b9f0c7e322e24cdf692
Author: Jason Gustafson 
Date:   2017-05-24T18:37:26Z

KAFKA-5017: Record batch first offset remains accurate after compaction




> Consider making baseOffset the first offset in message format v2
> 
>
> Key: KAFKA-5017
> URL: https://issues.apache.org/jira/browse/KAFKA-5017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Ismael Juma
> Fix For: 0.11.0.0
>
>
> Currently baseOffset starts as the first offset of the batch. If the first 
> record is removed by compaction, baseOffset doesn't change and it is no 
> longer the same as the first offset of the batch. This is inconsistent with 
> every other field in the record batch header and it seems like there is no 
> longer a reason for this behaviour.
> We should consider simplifying the behaviour so that baseOffset is simply the 
> first offset of the record batch. We need to do this before 0.11 or we 
> probably won't be able to do it until the next message format version change.
> cc [~hachikuji]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5017) Consider making baseOffset the first offset in message format v2

2017-05-24 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-5017:
---
Assignee: Jason Gustafson
  Status: Patch Available  (was: Open)

> Consider making baseOffset the first offset in message format v2
> 
>
> Key: KAFKA-5017
> URL: https://issues.apache.org/jira/browse/KAFKA-5017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Ismael Juma
>Assignee: Jason Gustafson
> Fix For: 0.11.0.0
>
>
> Currently baseOffset starts as the first offset of the batch. If the first 
> record is removed by compaction, baseOffset doesn't change and it is no 
> longer the same as the first offset of the batch. This is inconsistent with 
> every other field in the record batch header and it seems like there is no 
> longer a reason for this behaviour.
> We should consider simplifying the behaviour so that baseOffset is simply the 
> first offset of the record batch. We need to do this before 0.11 or we 
> probably won't be able to do it until the next message format version change.
> cc [~hachikuji]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-24 Thread Jeyhun Karimov
Hi Michal,

Thanks for your comments. I see your point and I agree with it. However, I
don't have a better idea for naming. I checked MR source code. There it is
used JobConfigurable and Closable, two different interfaces. Maybe we can
rename RichFunction as Configurable?


Cheers,
Jeyhun

On Tue, May 23, 2017 at 2:58 PM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Hi Jeyhun,
>
> I understand your argument about "Rich" in RichFunctions. Perhaps I'm just
> being too puritan here, but let me ask this anyway:
>
> What is it that makes something a function? To me a function is something
> that takes zero or more arguments and possibly returns a value and while it
> may have side-effects (as opposed to "pure functions" which can't), it
> doesn't have any life-cycle of its own. This is what, in my mind,
> distinguishes the concept of a "function" from that of more vaguely defined
> concepts.
>
> So if we add a life-cycle to a function, in that understanding, it doesn't
> become a rich function but instead stops being a function altogether.
>
> You could say it's "just semantics" but to me precise use of language in
> the given context is an important foundation for good engineering. And in
> the context of programming "function" has a precise meaning. Of course we
> can say that in the context of Kafka Streams "function" has a different,
> looser meaning but I'd argue that won't do anyone any good.
>
> On the other hand other frameworks such as Flink use this terminology, so
> it could be that consistency is the reason. I'm guessing that's why the
> name was proposed in the first place. My point is simply that it's a poor
> choice of wording and Kafka Streams don't have to follow that to the letter.
>
> Cheers,
>
> Michal
>
> On 23/05/17 13:26, Jeyhun Karimov wrote:
>
> Hi Michal,
>
> Thanks for your comments.
>
>
> To me at least it feels strange that something is called a function yet
>> doesn't follow the functional interface definition of having just one
>> abstract method. I suppose init and close could be made default methods
>> with empty bodies once Java 7 support is dropped to mitigate that concern.
>> Still, I feel some resistance to consider something that requires
>> initialisation and closing (which implies holding state) as being a
>> function. Sounds more like the Processor/Transformer kind of thing
>> semantically, rather than a function.
>
>
>  -  If we called the interface name only Function your assumptions will
> hold. However, the keyword Rich by definition implies that we have a
> function (as you described, with one abstract method and etc) but it is
> rich. So, there are multiple methods in it.
> Ideally it should be:
>
> public interface RichFunction extends Function {  // this is the
> Function that you described
>   void close();
>   void init(Some params);
>...
> }
>
>
> The KIP says there are multiple use-cases for this but doesn't enumerate
>> any - I think some examples would be useful, otherwise that section sounds
>> a little bit vague.
>
>
> I thought it is obvious by definition but I will update it. Thanks.
>
>
> IMHO, it's the access to the RecordContext is where the added value lies
>> but maybe I'm just lacking in imagination, so I'm asking all this to better
>> understand the rationale for init() and close().
>
>
> Maybe I should add some examples. Thanks.
>
>
> Cheers,
> Jeyhun
>
> On Mon, May 22, 2017 at 11:02 AM, Michal Borowiecki <
> michal.borowie...@openbet.com> wrote:
>
>> Hi Jeyhun,
>>
>> I'd like to understand better the premise of RichFunctions and why init(Some
>> params), close() are said to be needed.
>> To me at least it feels strange that something is called a function yet
>> doesn't follow the functional interface definition of having just one
>> abstract method. I suppose init and close could be made default methods
>> with empty bodies once Java 7 support is dropped to mitigate that concern.
>> Still, I feel some resistance to consider something that requires
>> initialisation and closing (which implies holding state) as being a
>> function. Sounds more like the Processor/Transformer kind of thing
>> semantically, rather than a function.
>>
>> The KIP says there are multiple use-cases for this but doesn't enumerate
>> any - I think some examples would be useful, otherwise that section sounds
>> a little bit vague.
>>
>> IMHO, it's the access to the RecordContext is where the added value lies
>> but maybe I'm just lacking in imagination, so I'm asking all this to better
>> understand the rationale for init() and close().
>>
>> Thanks,
>> Michał
>>
>> On 20/05/17 17:05, Jeyhun Karimov wrote:
>>
>> Dear community,
>>
>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like to initiate
>> KIP for rich functions (interfaces) [2].
>> I would like to get your comments.
>>
>>
>> [1]http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+and+ValueJoiner
>> [2

[jira] [Created] (KAFKA-5321) MemoryRecords.filterTo can return corrupt data if output buffer is not large enough

2017-05-24 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5321:
--

 Summary: MemoryRecords.filterTo can return corrupt data if output 
buffer is not large enough
 Key: KAFKA-5321
 URL: https://issues.apache.org/jira/browse/KAFKA-5321
 Project: Kafka
  Issue Type: Bug
  Components: log
Reporter: Jason Gustafson
Assignee: Jason Gustafson
Priority: Blocker
 Fix For: 0.11.0.0


Due to KAFKA-5316, it is possible for a record set to grow during cleaning and 
overflow the output buffer allocated for writing. When we reach the record set 
which is doomed to overflow the buffer, there are two possibilities:

1. No records were removed and the original entry is directly appended to the 
log. This results in the overflow reported in KAFKA-5316.
2. Records were removed and a new record set is built. 

Here we are concerned with the latter case.The problem is that the builder code 
automatically allocates a new buffer when we reach the end of the existing 
buffer and does not reset the position in the original buffer. Since 
{{MemoryRecords.filterTo}} continues using the old buffer, this can lead to 
data corruption after cleaning (the data left in the overflowed buffer is 
garbage). 

Note that this issue could get fixed as part of a general solution KAFKA-5316, 
but if that seems too risk, we might fix this separately. A simple solution is 
to make both paths consistent and ensure that we raise an exception.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2017-05-24 Thread Guozhang Wang
Kyle,

Thanks for the explanations, my previous read on the wiki examples was
wrong.

So I guess my motivation should be "reduced" to: can we move the window
specs param from "KGroupedStream#cogroup(..)" to
"CogroupedKStream#aggregate(..)", and my motivations are:

1. minor: we can reduce the #.generics in CogroupedKStream from 3 to 2.
2. major: this is for extensibility of the APIs, and since we are removing
the "Evolving" annotations on Streams it may be harder to change it again
in the future. The extended use cases are that people wanted to have
windowed running aggregates on different granularities, e.g. "give me the
counts per-minute, per-hour, per-day and per-week", and today in DSL we
need to specify that case in multiple aggregate operators, which gets a
state store / changelog, etc. And it is possible to optimize it as well to
a single state store. Its implementation would be tricky as you need to
contain different lengthed windows within your window store but just from
the public API point of view, it could be specified as:

CogroupedKStream stream = stream1.cogroup(stream2, ... "state-store-name");

table1 = stream.aggregate(/*per-minute window*/)
table2 = stream.aggregate(/*per-hour window*/)
table3 = stream.aggregate(/*per-day window*/)

while underlying we are only using a single store "state-store-name" for it.


Although this feature is out of the scope of this KIP, I'd like to discuss
if we can "leave the door open" to make such changes without modifying the
public APIs .

Guozhang


On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman 
wrote:

> I allow defining a single window/sessionwindow one time when you make the
> cogroup call from a KGroupedStream. From then on you are using the cogroup
> call from with in CogroupedKStream which doesnt accept any additional
> windows/sessionwindows.
>
> Is this what you meant by your question or did I misunderstand?
>
> On May 23, 2017 9:33 PM, "Guozhang Wang"  wrote:
>
> Another question that came to me is on "window alignment": from the KIP it
> seems you are allowing users to specify a (potentially different) window
> spec in each co-grouped input stream. So if these window specs are
> different how should we "align" them with different input streams? I think
> it is more natural to only specify on window spec in the
>
> KTable CogroupedKStream#aggregate(Windows);
>
>
> And remove it from the cogroup() functions. WDYT?
>
>
> Guozhang
>
> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang  wrote:
>
> > Thanks for the proposal Kyle, this is a quite common use case to support
> > such multi-way table join (i.e. N source tables with N aggregate func)
> with
> > a single store and N+1 serdes, I have seen lots of people using the
> > low-level PAPI to achieve this goal.
> >
> >
> > On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> winkelman.k...@gmail.com
> > > wrote:
> >
> >> I like your point about not handling other cases such as count and
> reduce.
> >>
> >> I think that reduce may not make sense because reduce assumes that the
> >> input values are the same as the output values. With cogroup there may
> be
> >> multiple different input types and then your output type cant be
> multiple
> >> different things. In the case where you have all matching value types
> you
> >> can do KStreamBuilder#merge followed by the reduce.
> >>
> >> As for count I think it is possible to call count on all the individual
> >> grouped streams and then do joins. Otherwise we could maybe make a
> special
> >> call in groupedstream for this case. Because in this case we dont need
> to
> >> do type checking on the values. It could be similar to the current count
> >> methods but accept a var args of additonal grouped streams as well and
> >> make
> >> sure they have a key type of K.
> >>
> >> The way I have put the kip together is to ensure that we do type
> checking.
> >> I don't see a way we could group them all first and then make a call to
> >> count, reduce, or aggregate because with aggregate they would need to
> pass
> >> a list of aggregators and we would have no way of type checking that
> they
> >> match the grouped streams.
> >>
> >> Thanks,
> >> Kyle
> >>
> >> On May 19, 2017 11:42 AM, "Xavier Léauté"  wrote:
> >>
> >> > Sorry to jump on this thread so late. I agree this is a very useful
> >> > addition and wanted to provide an additional use-case and some more
> >> > comments.
> >> >
> >> > This is actually a very common analytics use-case in the ad-tech
> >> industry.
> >> > The typical setup will have an auction stream, an impression stream,
> >> and a
> >> > click stream. Those three streams need to be combined to compute
> >> aggregate
> >> > statistics (e.g. impression statistics, and click-through rates),
> since
> >> > most of the attributes of interest are only present the auction
> stream.
> >> >
> >> > A simple way to do this is to co-group all the streams by the auction
> >> key,
> >> > and process updates to the co-group as events for each stream come

[jira] [Updated] (KAFKA-5251) Producer should drop queued sends when transaction is aborted

2017-05-24 Thread Apurva Mehta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apurva Mehta updated KAFKA-5251:

Priority: Blocker  (was: Major)

> Producer should drop queued sends when transaction is aborted
> -
>
> Key: KAFKA-5251
> URL: https://issues.apache.org/jira/browse/KAFKA-5251
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> As an optimization, if a transaction is aborted, we can drop any records 
> which have not yet been sent to the brokers. However, to avoid the sequence 
> number getting out of sync, we need to continue sending any request which has 
> been sent at least once.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5251) Producer should drop queued sends when transaction is aborted

2017-05-24 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023603#comment-16023603
 ] 

Apurva Mehta commented on KAFKA-5251:
-

Raising this to blocker status. Currently, if there are fatal errors for things 
like `AddPartitionsToTxn`, we will keep requeuing this requset with the pending 
partitions. Even if the user aborts the transactions, we will try to flush the 
pending `AddPartitions`. If the underlying error is not going to be resolved 
with retries, then this will go on forever, and the transaction can never 
abort. 

So we need to be drop pending transactional requests at a minimum when we are 
in an 'ABORTING_TRANSACTION` state so that the abort can be successfully 
completed. 

> Producer should drop queued sends when transaction is aborted
> -
>
> Key: KAFKA-5251
> URL: https://issues.apache.org/jira/browse/KAFKA-5251
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> As an optimization, if a transaction is aborted, we can drop any records 
> which have not yet been sent to the brokers. However, to avoid the sequence 
> number getting out of sync, we need to continue sending any request which has 
> been sent at least once.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4340) Change the default value of log.message.timestamp.difference.max.ms to the same as log.retention.ms

2017-05-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023619#comment-16023619
 ] 

Jiangjie Qin commented on KAFKA-4340:
-

[~edenhill] I think one of the well established guarantee in producing is that 
either the entire batch succeeds or the entire batch fails. A lot of the code 
are actually built on top of this guarantee, for example the callback and 
offsets in the callback.

So if one of the messages in the batch had problem, it is by current design the 
entire batch will fail. I do agree that in this case the producer does not 
really have much to do, but that seems not a problem introduced by this patch. 

Silently discard the message on the broker side will introduce other problems 
because the producer acks are on a batch. The producer will assume the message 
has been successfully sent while it has been removed. And this also breaks the 
log retention enforcing order which always remove an old segment before 
deleting messages in newer segments. 

> Change the default value of log.message.timestamp.difference.max.ms to the 
> same as log.retention.ms
> ---
>
> Key: KAFKA-4340
> URL: https://issues.apache.org/jira/browse/KAFKA-4340
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.11.0.0
>
>
> [~junrao] brought up the following scenario: 
> If users are pumping data with timestamp already passed log.retention.ms into 
> Kafka, the messages will be appended to the log but will be immediately 
> rolled out by log retention thread when it kicks in and the messages will be 
> deleted. 
> To avoid this produce-and-deleted scenario, we can set the default value of 
> log.message.timestamp.difference.max.ms to be the same as log.retention.ms.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5211) KafkaConsumer should not skip a corrupted record after throwing an exception.

2017-05-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023641#comment-16023641
 ] 

Jiangjie Qin commented on KAFKA-5211:
-

[~enothereska] I think a KIP is only required if there is a public API change 
or user sensible behavior change. 

Some clarification regarding this ticket, in 0.10.2.1 the consumer does not 
skip over a corrupted message. A refactor at some point after that changed the 
behavior to skip over the corrupted messages without a KIP. We are changing the 
behavior back to be the same as 0.10.2.1 to avoid user sensible behavior 
change. So I don't think a KIP is required for this ticket.

> KafkaConsumer should not skip a corrupted record after throwing an exception.
> -
>
> Key: KAFKA-5211
> URL: https://issues.apache.org/jira/browse/KAFKA-5211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>  Labels: clients, consumer
> Fix For: 0.11.0.0
>
>
> In 0.10.2, when there is a corrupted record, KafkaConsumer.poll() will throw 
> an exception and block on that corrupted record. In the latest trunk this 
> behavior has changed to skip the corrupted record (which is the old consumer 
> behavior). With KIP-98, skipping corrupted messages would be a little 
> dangerous as the message could be a control message for a transaction. We 
> should fix the issue to let the KafkaConsumer block on the corrupted messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5322) Resolve AddPartitions response error code inconsistency

2017-05-24 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5322:
--

 Summary: Resolve AddPartitions response error code inconsistency
 Key: KAFKA-5322
 URL: https://issues.apache.org/jira/browse/KAFKA-5322
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson
Priority: Blocker


The AddPartitions request does not support partial failures. Either all 
partitions are successfully added to the transaction or none of them are. 
Currently we return a separate error code for each partition that was added to 
the transaction, which begs the question of what error code to return if we 
have not actually encountered a fatal partition-level error for some partition. 
For example, suppose we send AddPartitions with partitions A and B. If A is not 
authorized, we will not even attempt to add B to the transaction, but what 
error code should we use. The current solution is to only include partition A 
and its error code in the response, but this is a little inconsistent with most 
other request types. Alternatives that have been proposed:

1. Instead of a partition-level error, use one global error. We can add a 
global error message to return friendlier details to the user about which 
partition had a fault. The downside is that we would have to parse the message 
contents if we wanted to do any partition-specific handling. We could not 
easily fill the set of topics in {{TopicAuthorizationException}} for example.
2. We can add a new error code to indicate that the broker did not even attempt 
to add the partition to the transaction. For example: OPERATION_NOT_ATTEMPTED 
or something like that. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2017-05-24 Thread kyle k (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023654#comment-16023654
 ] 

kyle k commented on KAFKA-3821:
---

[~rhauch], I agree with your point. I was thinking of the more limited case in 
which {{poll()}} returned 0 filtered records but wanted to record progress at 
the source. I like returning a special subtype of {{SourceRecord}} with only 
the {{sourcePartition}} and {{sourceOffset}} present for each and every source 
record I process. Much cleaner 'streaming' code in the task. 

This would also apply to the {{Transformation}} API

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5211) KafkaConsumer should not skip a corrupted record after throwing an exception.

2017-05-24 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023656#comment-16023656
 ] 

Ismael Juma commented on KAFKA-5211:


This particular JIRA doesn't need a KIP because it's indeed just restoring the 
behaviour in trunk to match the behaviour from previous releases. We do need a 
KIP if we want to change how we handle errors during deserialization and such.

> KafkaConsumer should not skip a corrupted record after throwing an exception.
> -
>
> Key: KAFKA-5211
> URL: https://issues.apache.org/jira/browse/KAFKA-5211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>  Labels: clients, consumer
> Fix For: 0.11.0.0
>
>
> In 0.10.2, when there is a corrupted record, KafkaConsumer.poll() will throw 
> an exception and block on that corrupted record. In the latest trunk this 
> behavior has changed to skip the corrupted record (which is the old consumer 
> behavior). With KIP-98, skipping corrupted messages would be a little 
> dangerous as the message could be a control message for a transaction. We 
> should fix the issue to let the KafkaConsumer block on the corrupted messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4340) Change the default value of log.message.timestamp.difference.max.ms to the same as log.retention.ms

2017-05-24 Thread Magnus Edenhill (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023668#comment-16023668
 ] 

Magnus Edenhill commented on KAFKA-4340:


Generally I would agree, but in this case I don't think there is a practical 
difference to discarding outdated messages, or removing them with the retention 
cleaner, it is just a matter of time frame - discarding and not appending 
outdated messages to the logs is immediate, while the retention cleaner might 
kick in immediately or in whatever max interval it is configured to.
So from the producer and consumer perspectives the end result is pretty much 
the same: there is no guarantee that an outdated message will be seen by a 
consumer.

However, rejecting the entire batch means there will be guaranteed data loss in 
the message stream: the producer will not try to re-send those failed messages, 
even if all but one message in the batch were actually okay. I strongly feel 
this is undesired behaviour from the application's point of view.


> Change the default value of log.message.timestamp.difference.max.ms to the 
> same as log.retention.ms
> ---
>
> Key: KAFKA-4340
> URL: https://issues.apache.org/jira/browse/KAFKA-4340
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.11.0.0
>
>
> [~junrao] brought up the following scenario: 
> If users are pumping data with timestamp already passed log.retention.ms into 
> Kafka, the messages will be appended to the log but will be immediately 
> rolled out by log retention thread when it kicks in and the messages will be 
> deleted. 
> To avoid this produce-and-deleted scenario, we can set the default value of 
> log.message.timestamp.difference.max.ms to be the same as log.retention.ms.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5323) AdminUtils.createTopic should check topic existence upfront

2017-05-24 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5323:
---

 Summary: AdminUtils.createTopic should check topic existence 
upfront
 Key: KAFKA-5323
 URL: https://issues.apache.org/jira/browse/KAFKA-5323
 Project: Kafka
  Issue Type: Improvement
Reporter: Onur Karaman
Assignee: Onur Karaman


When a topic exists, AdminUtils.createTopic unnecessarily does N+2 zookeeper 
reads where N is the number of brokers. Here is the breakdown of the N+2 
zookeeper reads:
# reads the current list of brokers in zookeeper (1 zookeeper read)
# reads metadata for each broker in zookeeper (N zookeeper reads where N is the 
number of brokers)
# checks for topic existence in zookeeper (1 zookeeper read)

We can reduce the N+2 reads down to 1 by checking topic existence upfront.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3139: MINOR: fix flakiness in testDeleteAcls

2017-05-24 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/3139

MINOR: fix flakiness in testDeleteAcls

This call to isCompletedExceptionally introduced a race condition
because the future might not have been completed.  assertFutureError
checks that the exception is present and of the correct type in any
case, so the call was not necessary.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka fix-test-deleteacls

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3139


commit 36057c8833cbbd9908646f01fb164ce7e68d7cf9
Author: Colin P. Mccabe 
Date:   2017-05-24T21:29:59Z

MINOR: fix flakiness in testDeleteAcls

This call to isCompletedExceptionally introduced a race condition
because the future might not have been completed.  assertFutureError
checks that the exception is present and of the correct type in any
case, so the call was not necessary.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-2060) Async onCompletion callback may not be called

2017-05-24 Thread Bill Sobel (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Sobel resolved KAFKA-2060.
---
   Resolution: Fixed
Fix Version/s: 0.9.0.2

Per notes this was fixed with the later Kafka libs

> Async onCompletion callback may not be called
> -
>
> Key: KAFKA-2060
> URL: https://issues.apache.org/jira/browse/KAFKA-2060
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.2
> Environment: All
>Reporter: Bill Sobel
>Priority: Critical
>  Labels: easyfix
> Fix For: 0.9.0.2
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 'done' function in RecordBatch.java attempts to enumerate and call each 
> onCompletion() callback.  However the call to thunk.future.get() can throw an 
> exception.  When this occurs the callback is not invoked.  This appears to be 
> the only place where a callback per async send would not occur and the 
> callback orphaned.
> The call to thunk.future.get() appears to need to occur in its own try/catch 
> and then the onCompletion called with the results if it doesn't throw an 
> exception or thunk.callback.onCompletion(null, recordException) if it does.
> e.g.
> /**
>  * Complete the request
>  * 
>  * @param baseOffset The base offset of the messages assigned by the 
> server
>  * @param exception The exception that occurred (or null if the request 
> was successful)
>  */
> public void done(long baseOffset, RuntimeException exception) {
> this.produceFuture.done(topicPartition, baseOffset, exception);
> log.trace("Produced messages to topic-partition {} with base offset 
> offset {} and error: {}.",
>   topicPartition,
>   baseOffset,
>   exception);
> // execute callbacks
> for (int i = 0; i < this.thunks.size(); i++) {
> try {
> Thunk thunk = this.thunks.get(i);
> if (exception == null) {
> RecordMetadata rc = null;
> try {
> rc = thunk.future.get();
> }
>  catch(Exception recordException) {
> thunk.callback.onCompletion(null, 
> recordException);
> }
> if(rc != null) {
> thunk.callback.onCompletion(rc, null);
> }
> }
>  else {
>  thunk.callback.onCompletion(null, exception);
>  }
> } catch (Exception e) {
> log.error("Error executing user-provided callback on message 
> for topic-partition {}:", topicPartition, e);
> }
> }
> }



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3140: KAFKA-5309: Stores not queryable after one thread ...

2017-05-24 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/3140

KAFKA-5309: Stores not queryable after one thread died

 - introduces a new thread state DEAD
 - ignores DEAD threads when querying

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka kafka-5309-stores-not-queryable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3140


commit d4fe00bcf401d493595e26d4077060d5edb6878a
Author: Matthias J. Sax 
Date:   2017-05-24T20:17:37Z

KAFKA-5309: Stores not queryable after one thread died
 - introduces a new thread state DEAD
 - ignores DEAD threads when querying




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5309) Stores not queryable after one thread died

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023760#comment-16023760
 ] 

ASF GitHub Bot commented on KAFKA-5309:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/3140

KAFKA-5309: Stores not queryable after one thread died

 - introduces a new thread state DEAD
 - ignores DEAD threads when querying

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka kafka-5309-stores-not-queryable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3140


commit d4fe00bcf401d493595e26d4077060d5edb6878a
Author: Matthias J. Sax 
Date:   2017-05-24T20:17:37Z

KAFKA-5309: Stores not queryable after one thread died
 - introduces a new thread state DEAD
 - ignores DEAD threads when querying




> Stores not queryable after one thread died
> --
>
> Key: KAFKA-5309
> URL: https://issues.apache.org/jira/browse/KAFKA-5309
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> For a multi threaded Streams instance, it can happen that one thread dies 
> while all other threads (within a single instance) keep running. Even if this 
> is an rare scenario, we should allow querying the stores after rebalance 
> finished. However, this does never happen, as the died thread's state is 
> still in {{KafkaStreams}} thread state map (as {{NOT_RUNNING}}), and thus, 
> {{KafkaStreams}} itself is in state {{REBALANCING}} all the time and does not 
> transit back to {{RUNNING}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5309) Stores not queryable after one thread died

2017-05-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5309:
---
Fix Version/s: 0.11.0.0
   Status: Patch Available  (was: Open)

> Stores not queryable after one thread died
> --
>
> Key: KAFKA-5309
> URL: https://issues.apache.org/jira/browse/KAFKA-5309
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.11.0.0
>
>
> For a multi threaded Streams instance, it can happen that one thread dies 
> while all other threads (within a single instance) keep running. Even if this 
> is an rare scenario, we should allow querying the stores after rebalance 
> finished. However, this does never happen, as the died thread's state is 
> still in {{KafkaStreams}} thread state map (as {{NOT_RUNNING}}), and thus, 
> {{KafkaStreams}} itself is in state {{REBALANCING}} all the time and does not 
> transit back to {{RUNNING}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5259) TransactionalId authorization should imply ProducerId authorization

2017-05-24 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-5259:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 3075
[https://github.com/apache/kafka/pull/3075]

> TransactionalId authorization should imply ProducerId authorization
> ---
>
> Key: KAFKA-5259
> URL: https://issues.apache.org/jira/browse/KAFKA-5259
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> There is not much point to only authorizing a transactionalId: without 
> producerId authorization, a principal cannot actually write any transactional 
> data. So we may as well make ProducerId authorization implicit if a 
> transactionalId is authorized. 
> There are also a couple cases that we missed in the initial authorization 
> patch which we may as well handle here.
> 1. FindCoordinatorRequest should authorize by transactionalId
> 2. TxnOffsetCommitRequest should also authorize by transactionalId. Currently 
> this field is not included in the request type but it probably should be 
> since then writing any transactional data requires authorization to some 
> transactionalId.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3075: KAFKA-5259: TransactionalId auth implies ProducerI...

2017-05-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3075


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5259) TransactionalId authorization should imply ProducerId authorization

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023811#comment-16023811
 ] 

ASF GitHub Bot commented on KAFKA-5259:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3075


> TransactionalId authorization should imply ProducerId authorization
> ---
>
> Key: KAFKA-5259
> URL: https://issues.apache.org/jira/browse/KAFKA-5259
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> There is not much point to only authorizing a transactionalId: without 
> producerId authorization, a principal cannot actually write any transactional 
> data. So we may as well make ProducerId authorization implicit if a 
> transactionalId is authorized. 
> There are also a couple cases that we missed in the initial authorization 
> patch which we may as well handle here.
> 1. FindCoordinatorRequest should authorize by transactionalId
> 2. TxnOffsetCommitRequest should also authorize by transactionalId. Currently 
> this field is not included in the request type but it probably should be 
> since then writing any transactional data requires authorization to some 
> transactionalId.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5226) NullPointerException (NPE) in SourceNodeRecordDeserializer.deserialize

2017-05-24 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5226 started by Bill Bejeck.
--
> NullPointerException (NPE) in SourceNodeRecordDeserializer.deserialize
> --
>
> Key: KAFKA-5226
> URL: https://issues.apache.org/jira/browse/KAFKA-5226
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
> Environment: 64-bit Amazon Linux, JDK8
>Reporter: Ian Springer
>Assignee: Bill Bejeck
> Attachments: kafka.log
>
>
> I saw the following NPE in our Kafka Streams app, which has 3 nodes running 
> on 3 separate machines.. Out of hundreds of messages processed, the NPE only 
> occurred twice. I are not sure of the cause, so I am unable to reproduce it. 
> I'm hoping the Kafka Streams team can guess the cause based on the stack 
> trace. If I can provide any additional details about our app, please let me 
> know.
>  
> {code}
> INFO  2017-05-10 02:58:26,021 org.apache.kafka.common.utils.AppInfoParser  
> Kafka version : 0.10.2.1
> INFO  2017-05-10 02:58:26,021 org.apache.kafka.common.utils.AppInfoParser  
> Kafka commitId : e89bffd6b2eff799
> INFO  2017-05-10 02:58:26,031 o.s.context.support.DefaultLifecycleProcessor  
> Starting beans in phase 0
> INFO  2017-05-10 02:58:26,075 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from CREATED to RUNNING.
> INFO  2017-05-10 02:58:26,075 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] Started 
> Kafka Stream process
> INFO  2017-05-10 02:58:26,086 o.a.k.c.consumer.internals.AbstractCoordinator  
> Discovered coordinator p1kaf1.prod.apptegic.com:9092 (id: 2147482646 rack: 
> null) for group evergage-app.
> INFO  2017-05-10 02:58:26,126 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Revoking previously assigned partitions [] for group evergage-app
> INFO  2017-05-10 02:58:26,126 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from RUNNING to REBALANCING.
> INFO  2017-05-10 02:58:26,127 o.a.k.c.consumer.internals.AbstractCoordinator  
> (Re-)joining group evergage-app
> INFO  2017-05-10 02:58:27,712 o.a.k.c.consumer.internals.AbstractCoordinator  
> Successfully joined group evergage-app with generation 18
> INFO  2017-05-10 02:58:27,716 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Setting newly assigned partitions [us.app.Trigger-0] for group evergage-app
> INFO  2017-05-10 02:58:27,716 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to REBALANCING.
> INFO  2017-05-10 02:58:27,729 
> o.a.kafka.streams.processor.internals.StreamTask  task [0_0] Initializing 
> state stores
> INFO  2017-05-10 02:58:27,731 
> o.a.kafka.streams.processor.internals.StreamTask  task [0_0] Initializing 
> processor nodes of the topology
> INFO  2017-05-10 02:58:27,742 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to RUNNING.
> [14 hours pass...]
> INFO  2017-05-10 16:21:27,476 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Revoking previously assigned partitions [us.app.Trigger-0] for group 
> evergage-app
> INFO  2017-05-10 16:21:27,477 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from RUNNING to REBALANCING.
> INFO  2017-05-10 16:21:27,482 o.a.k.c.consumer.internals.AbstractCoordinator  
> (Re-)joining group evergage-app
> INFO  2017-05-10 16:21:27,489 o.a.k.c.consumer.internals.AbstractCoordinator  
> Successfully joined group evergage-app with generation 19
> INFO  2017-05-10 16:21:27,489 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Setting newly assigned partitions [us.app.Trigger-0] for group evergage-app
> INFO  2017-05-10 16:21:27,489 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to REBALANCING.
> INFO  2017-05-10 16:21:27,489 
> o.a.kafka.streams.processor.internals.StreamTask  task [0_0] Initializing 
> processor nodes of the topology
> INFO  2017-05-10 16:21:27,493 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to RUNNING.
> INFO  2017-05-10 16:21:30,584 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Revoking previously assigned partitions [us.app.Trigger-0] for group 
> evergage-app
> INFO  2017-05-10 16:21:30,584 org.apache.kafka.streams.KafkaStreams  
> 

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-24 Thread Matthias J. Sax
Jeyhun,

I was just wondering if you did look into the key-deep-copy idea we
discussed. I am curious to see what the impact might be.


-Matthias

On 5/20/17 2:03 AM, Jeyhun Karimov wrote:
> Hi,
> 
> Thanks for your comments. I rethink about including rich functions into
> this KIP.
> I think once we include rich functions in this KIP and then fix
> ProcessorContext in another KIP and incorporate with existing rich
> functions, the code will not be backwards compatible.
> 
> I see Damian's and your point more clearly now.
> 
> Conclusion: we include only withKey interfaces in this KIP (I updated the
> KIP), I will try also initiate another KIP for rich functions.
> 
> Cheers,
> Jeyhun
> 
> On Fri, May 19, 2017 at 10:50 PM Matthias J. Sax 
> wrote:
> 
>> With the current KIP, using ValueMapper and ValueMapperWithKey
>> interfaces, RichFunction seems to be an independent add-on. To fix the
>> original issue to allow key access, RichFunctions are not required IMHO.
>>
>> I initially put the RichFunction idea on the table, because I was hoping
>> to get a uniform API. And I think, is was good to consider them --
>> however, the discussion showed that they are not necessary for key
>> access. Thus, it seems to be better to handle RichFunctions in an own
>> KIP. The ProcessorContext/RecordContext issues seems to be a main
>> challenge for this. And introducing RichFunctions with parameter-less
>> init() method, seem not to help too much. We would get an "intermediate"
>> API that we want to change anyway later on...
>>
>> As you put already much effort into RichFunction, feel free do push this
>> further and start a new KIP (we can do this even in parallel) -- we
>> don't want to slow you down :) But it make the discussion and code
>> review easier, if we separate both IMHO.
>>
>>
>> -Matthias
>>
>>
>> On 5/19/17 2:25 AM, Jeyhun Karimov wrote:
>>> Hi Damian,
>>>
>>> Thanks for your comments. I think providing to users *interface* rather
>>> than *abstract class* should be preferred (Matthias also raised this
>> issue
>>> ), anyway I changed the corresponding parts of KIP.
>>>
>>> Regarding with passing additional contextual information, I think it is a
>>> tradeoff,
>>> 1) first, we fix the context parameter for *init() *method in another PR
>>> and solve Rich functions afterwards
>>> 2) first, we fix the requested issues on jira ([1-3]) with providing
>>> (not-complete) Rich functions and integrate the context parameters to
>> this
>>> afterwards (like improvement)
>>>
>>> To me, the second approach seems more incremental. However you are right,
>>> the names might confuse the users.
>>>
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/KAFKA-4218
>>> [2] https://issues.apache.org/jira/browse/KAFKA-4726
>>> [3] https://issues.apache.org/jira/browse/KAFKA-3745
>>>
>>>
>>> Cheers,
>>> Jeyhun
>>>
>>>
>>> On Fri, May 19, 2017 at 10:42 AM Damian Guy 
>> wrote:
>>>
 Hi,

 I see you've removed the `ProcessorContext` from the RichFunction which
>> is
 good, but why is it a `RichFunction`? I'd have expected it to pass some
 additional contextual information, i.e., the `RecordContext` that
>> contains
 just the topic, partition, timestamp, offset.  I'm ok with it not
>> passing
 this contextual information, but is the naming incorrect? I'm not sure,
 tbh. I'm wondering if we should drop `RichFunctions` until we can do it
 properly with the correct context?

 Also, i don't like the abstract classes: RichValueMapper,
>> RichValueJoiner,
 RichInitializer etc. Why can't they not just be interfaces? Generally we
 should provide users with Intefaces to code against, not classes.

 Thanks,
 Damian

 On Fri, 19 May 2017 at 00:50 Jeyhun Karimov 
>> wrote:

> Hi,
>
> Thanks. I initiated the PR as well, to get a better overview.
>
> The only reason that I used abstract class instead of interface for
>> Rich
> functions is that in future if we will have some AbstractRichFunction
> abstract classes,
> we can easily extend:
>
> public abstract class RichValueMapper  implements
> ValueMapperWithKey, RichFunction *extends
 AbstractRichFunction*{
> }
>  With interfaces we are only limited to interfaces for inheritance.
>
> However, I think we can easily change it (from abstract class ->
 interface)
> if you think interface is a better fit.
>
>
> Cheers,
> Jeyhun
>
>
> On Fri, May 19, 2017 at 1:00 AM Matthias J. Sax >>
> wrote:
>
>> Thanks for the update and explanations. The KIP is quite improved now
 --
>> great job!
>>
>> One more question: Why are RichValueMapper etc abstract classes and
>> not
>> interfaces?
>>
>>
>> Overall, I like the KIP a lot!
>>
>>
>> -Matthias
>>
>>
>> On 5/16/17 7:03 AM, Jeyhun Karimov wrote:
>>> Hi,
>>>
>>> Thanks for your comments.
>>>
>>> I thi

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-24 Thread Matthias J. Sax
I confess, the term is borrowed from Flink :)

Personally, I never thought about it, but I tend to agree with Michal. I
also want to clarify, that the main purpose is the ability to access
record metadata. Thus, it might even be sufficient to only have "init".

An alternative would of course be, to pass in the RecordContext as
method parameter. This would allow us to drop "init()". This might even
allow to use Lambdas and we could keep the name RichFunction as we
preserve the nature of being a function.


-Matthias

On 5/24/17 12:13 PM, Jeyhun Karimov wrote:
> Hi Michal,
> 
> Thanks for your comments. I see your point and I agree with it. However,
> I don't have a better idea for naming. I checked MR source code. There
> it is used JobConfigurable and Closable, two different interfaces. Maybe
> we can rename RichFunction as Configurable? 
> 
> 
> Cheers,
> Jeyhun
> 
> On Tue, May 23, 2017 at 2:58 PM Michal Borowiecki
> mailto:michal.borowie...@openbet.com>>
> wrote:
> 
> Hi Jeyhun,
> 
> I understand your argument about "Rich" in RichFunctions. Perhaps
> I'm just being too puritan here, but let me ask this anyway:
> 
> What is it that makes something a function? To me a function is
> something that takes zero or more arguments and possibly returns a
> value and while it may have side-effects (as opposed to "pure
> functions" which can't), it doesn't have any life-cycle of its own.
> This is what, in my mind, distinguishes the concept of a "function"
> from that of more vaguely defined concepts.
> 
> So if we add a life-cycle to a function, in that understanding, it
> doesn't become a rich function but instead stops being a function
> altogether.
> 
> You could say it's "just semantics" but to me precise use of
> language in the given context is an important foundation for good
> engineering. And in the context of programming "function" has a
> precise meaning. Of course we can say that in the context of Kafka
> Streams "function" has a different, looser meaning but I'd argue
> that won't do anyone any good.
> 
> On the other hand other frameworks such as Flink use this
> terminology, so it could be that consistency is the reason. I'm
> guessing that's why the name was proposed in the first place. My
> point is simply that it's a poor choice of wording and Kafka Streams
> don't have to follow that to the letter.
> 
> Cheers,
> 
> Michal
> 
> 
> On 23/05/17 13:26, Jeyhun Karimov wrote:
>> Hi Michal,
>>
>> Thanks for your comments.
>>
>>
>> To me at least it feels strange that something is called a
>> function yet doesn't follow the functional interface
>> definition of having just one abstract method. I suppose init
>> and close could be made default methods with empty bodies once
>> Java 7 support is dropped to mitigate that concern. Still, I
>> feel some resistance to consider something that requires
>> initialisation and closing (which implies holding state) as
>> being a function. Sounds more like the Processor/Transformer
>> kind of thing semantically, rather than a function. 
>>
>>
>>  -  If we called the interface name only Function your assumptions
>> will hold. However, the keyword Rich by definition implies that we
>> have a function (as you described, with one abstract method and
>> etc) but it is rich. So, there are multiple methods in it. 
>> Ideally it should be:
>>
>> public interface RichFunction extends Function {  // this
>> is the Function that you described
>>   void close();
>>   void init(Some params);
>>...
>> }
>>
>>
>> The KIP says there are multiple use-cases for this but doesn't
>> enumerate any - I think some examples would be useful,
>> otherwise that section sounds a little bit vague. 
>>
>>
>> I thought it is obvious by definition but I will update it. Thanks. 
>>
>>
>> IMHO, it's the access to the RecordContext is where the added
>> value lies but maybe I'm just lacking in imagination, so I'm
>> asking all this to better understand the rationale for init()
>> and close().
>>
>>
>> Maybe I should add some examples. Thanks. 
>>
>>
>> Cheers,
>> Jeyhun
>>
>> On Mon, May 22, 2017 at 11:02 AM, Michal Borowiecki
>> > > wrote:
>>
>> Hi Jeyhun,
>>
>> I'd like to understand better the premise of RichFunctions and
>> why |init(Some params)|,| close() |are said to be needed.
>>
>> To me at least it feels strange that something is called a
>> function yet doesn't follow the functional interface
>> definition of having just one abstract method. I suppose init
>> and close could be made default methods with empty bodies once
>> Java 7 support is dropped to mitigate that con

[jira] [Created] (KAFKA-5324) AdminClient: add close with timeout, fix some timeout bugs

2017-05-24 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-5324:
--

 Summary: AdminClient: add close with timeout, fix some timeout bugs
 Key: KAFKA-5324
 URL: https://issues.apache.org/jira/browse/KAFKA-5324
 Project: Kafka
  Issue Type: Bug
Reporter: Colin P. McCabe


* Add a close method with a timeout, similar to some other APIs.  Close waits 
for all calls to complete.  Once the timeout expires, all calls are aborted.

* Fix a minor bug which made it impossible to have per-call timeouts which were 
longer than the default timeout.

* Fix a minor bug where we could oversleep short timeouts because we didn't 
adjust the polling interval based on the extant timeouts.

* Add some unit tests for timeouts



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5324) AdminClient: add close with timeout, fix some timeout bugs

2017-05-24 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe reassigned KAFKA-5324:
--

Assignee: Colin P. McCabe

> AdminClient: add close with timeout, fix some timeout bugs
> --
>
> Key: KAFKA-5324
> URL: https://issues.apache.org/jira/browse/KAFKA-5324
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> * Add a close method with a timeout, similar to some other APIs.  Close waits 
> for all calls to complete.  Once the timeout expires, all calls are aborted.
> * Fix a minor bug which made it impossible to have per-call timeouts which 
> were longer than the default timeout.
> * Fix a minor bug where we could oversleep short timeouts because we didn't 
> adjust the polling interval based on the extant timeouts.
> * Add some unit tests for timeouts



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5324) AdminClient: add close with timeout, fix some timeout bugs

2017-05-24 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-5324:
---
Affects Version/s: 0.11.0.0
  Component/s: admin

> AdminClient: add close with timeout, fix some timeout bugs
> --
>
> Key: KAFKA-5324
> URL: https://issues.apache.org/jira/browse/KAFKA-5324
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> * Add a close method with a timeout, similar to some other APIs.  Close waits 
> for all calls to complete.  Once the timeout expires, all calls are aborted.
> * Fix a minor bug which made it impossible to have per-call timeouts which 
> were longer than the default timeout.
> * Fix a minor bug where we could oversleep short timeouts because we didn't 
> adjust the polling interval based on the extant timeouts.
> * Add some unit tests for timeouts



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-24 Thread Matthias J. Sax
One more question:

Should we add any of
 - InitizialierWithKey
 - ReducerWithKey
 - ValueTransformerWithKey

To get consistent/complete API, it might be a good idea. Any thoughts?


-Matthias


On 5/24/17 3:47 PM, Matthias J. Sax wrote:
> Jeyhun,
> 
> I was just wondering if you did look into the key-deep-copy idea we
> discussed. I am curious to see what the impact might be.
> 
> 
> -Matthias
> 
> On 5/20/17 2:03 AM, Jeyhun Karimov wrote:
>> Hi,
>>
>> Thanks for your comments. I rethink about including rich functions into
>> this KIP.
>> I think once we include rich functions in this KIP and then fix
>> ProcessorContext in another KIP and incorporate with existing rich
>> functions, the code will not be backwards compatible.
>>
>> I see Damian's and your point more clearly now.
>>
>> Conclusion: we include only withKey interfaces in this KIP (I updated the
>> KIP), I will try also initiate another KIP for rich functions.
>>
>> Cheers,
>> Jeyhun
>>
>> On Fri, May 19, 2017 at 10:50 PM Matthias J. Sax 
>> wrote:
>>
>>> With the current KIP, using ValueMapper and ValueMapperWithKey
>>> interfaces, RichFunction seems to be an independent add-on. To fix the
>>> original issue to allow key access, RichFunctions are not required IMHO.
>>>
>>> I initially put the RichFunction idea on the table, because I was hoping
>>> to get a uniform API. And I think, is was good to consider them --
>>> however, the discussion showed that they are not necessary for key
>>> access. Thus, it seems to be better to handle RichFunctions in an own
>>> KIP. The ProcessorContext/RecordContext issues seems to be a main
>>> challenge for this. And introducing RichFunctions with parameter-less
>>> init() method, seem not to help too much. We would get an "intermediate"
>>> API that we want to change anyway later on...
>>>
>>> As you put already much effort into RichFunction, feel free do push this
>>> further and start a new KIP (we can do this even in parallel) -- we
>>> don't want to slow you down :) But it make the discussion and code
>>> review easier, if we separate both IMHO.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 5/19/17 2:25 AM, Jeyhun Karimov wrote:
 Hi Damian,

 Thanks for your comments. I think providing to users *interface* rather
 than *abstract class* should be preferred (Matthias also raised this
>>> issue
 ), anyway I changed the corresponding parts of KIP.

 Regarding with passing additional contextual information, I think it is a
 tradeoff,
 1) first, we fix the context parameter for *init() *method in another PR
 and solve Rich functions afterwards
 2) first, we fix the requested issues on jira ([1-3]) with providing
 (not-complete) Rich functions and integrate the context parameters to
>>> this
 afterwards (like improvement)

 To me, the second approach seems more incremental. However you are right,
 the names might confuse the users.



 [1] https://issues.apache.org/jira/browse/KAFKA-4218
 [2] https://issues.apache.org/jira/browse/KAFKA-4726
 [3] https://issues.apache.org/jira/browse/KAFKA-3745


 Cheers,
 Jeyhun


 On Fri, May 19, 2017 at 10:42 AM Damian Guy 
>>> wrote:

> Hi,
>
> I see you've removed the `ProcessorContext` from the RichFunction which
>>> is
> good, but why is it a `RichFunction`? I'd have expected it to pass some
> additional contextual information, i.e., the `RecordContext` that
>>> contains
> just the topic, partition, timestamp, offset.  I'm ok with it not
>>> passing
> this contextual information, but is the naming incorrect? I'm not sure,
> tbh. I'm wondering if we should drop `RichFunctions` until we can do it
> properly with the correct context?
>
> Also, i don't like the abstract classes: RichValueMapper,
>>> RichValueJoiner,
> RichInitializer etc. Why can't they not just be interfaces? Generally we
> should provide users with Intefaces to code against, not classes.
>
> Thanks,
> Damian
>
> On Fri, 19 May 2017 at 00:50 Jeyhun Karimov 
>>> wrote:
>
>> Hi,
>>
>> Thanks. I initiated the PR as well, to get a better overview.
>>
>> The only reason that I used abstract class instead of interface for
>>> Rich
>> functions is that in future if we will have some AbstractRichFunction
>> abstract classes,
>> we can easily extend:
>>
>> public abstract class RichValueMapper  implements
>> ValueMapperWithKey, RichFunction *extends
> AbstractRichFunction*{
>> }
>>  With interfaces we are only limited to interfaces for inheritance.
>>
>> However, I think we can easily change it (from abstract class ->
> interface)
>> if you think interface is a better fit.
>>
>>
>> Cheers,
>> Jeyhun
>>
>>
>> On Fri, May 19, 2017 at 1:00 AM Matthias J. Sax >>>
>> wrote:
>>
>>> Thanks for the update and explanations. The KI

[GitHub] kafka pull request #3141: KAFKA-5324: AdminClient: add close with timeout, f...

2017-05-24 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/3141

KAFKA-5324: AdminClient: add close with timeout, fix some timeout bugs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-5324

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3141


commit 88f1ca542cdfb151bc5ff0153cfe37364d687a36
Author: Colin P. Mccabe 
Date:   2017-05-23T23:36:05Z

KAFKA-5324: AdminClient: add close with timeout, fix some timeout bugs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5324) AdminClient: add close with timeout, fix some timeout bugs

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023858#comment-16023858
 ] 

ASF GitHub Bot commented on KAFKA-5324:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/3141

KAFKA-5324: AdminClient: add close with timeout, fix some timeout bugs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-5324

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3141


commit 88f1ca542cdfb151bc5ff0153cfe37364d687a36
Author: Colin P. Mccabe 
Date:   2017-05-23T23:36:05Z

KAFKA-5324: AdminClient: add close with timeout, fix some timeout bugs




> AdminClient: add close with timeout, fix some timeout bugs
> --
>
> Key: KAFKA-5324
> URL: https://issues.apache.org/jira/browse/KAFKA-5324
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> * Add a close method with a timeout, similar to some other APIs.  Close waits 
> for all calls to complete.  Once the timeout expires, all calls are aborted.
> * Fix a minor bug which made it impossible to have per-call timeouts which 
> were longer than the default timeout.
> * Fix a minor bug where we could oversleep short timeouts because we didn't 
> adjust the polling interval based on the extant timeouts.
> * Add some unit tests for timeouts



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4340) Change the default value of log.message.timestamp.difference.max.ms to the same as log.retention.ms

2017-05-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023862#comment-16023862
 ] 

Jun Rao commented on KAFKA-4340:


[~edenhill], as Jiangjie said, currently, the broker takes the whole batch as 
the unit. So, it's a bit weird for the broker to take only some of the messages 
in a batch. This is especially true with the EOS work in which the producer 
wants to commit multiple messages all or nothing. If the broker silently 
rejects some messages without telling the producer, the EOS guarantees will be 
broken.

Your point is valid though in that if the broker rejects the whole batch, 
currently the client doesn't know which messages fail the timestamp check. One 
way to address this is to include the array index of messages failing the 
timestamp check in the partition level produce response. Then, the client still 
fails all messages in the batch but with different error code in the callback, 
depending on whether the message fails the timestamp check or not. That way, 
the application can choose to resend failed messages with good timestamp, if 
desired. 

> Change the default value of log.message.timestamp.difference.max.ms to the 
> same as log.retention.ms
> ---
>
> Key: KAFKA-4340
> URL: https://issues.apache.org/jira/browse/KAFKA-4340
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.11.0.0
>
>
> [~junrao] brought up the following scenario: 
> If users are pumping data with timestamp already passed log.retention.ms into 
> Kafka, the messages will be appended to the log but will be immediately 
> rolled out by log retention thread when it kicks in and the messages will be 
> deleted. 
> To avoid this produce-and-deleted scenario, we can set the default value of 
> log.message.timestamp.difference.max.ms to be the same as log.retention.ms.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-05-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4785:
---
Fix Version/s: 0.11.0.0
   Status: Patch Available  (was: Open)

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
> Fix For: 0.11.0.0
>
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3142: KAFKA-5316: LogCleaner should account for larger r...

2017-05-24 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/3142

KAFKA-5316: LogCleaner should account for larger record sets after cleaning



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-5316

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3142


commit e8981baa183444870f8ab1d6634e4e3f634371c2
Author: Jason Gustafson 
Date:   2017-05-24T23:41:49Z

KAFKA-5316: LogCleaner should account for larger record sets after cleaning




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5316) Log cleaning can increase message size and cause cleaner to crash with buffer overflow

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023898#comment-16023898
 ] 

ASF GitHub Bot commented on KAFKA-5316:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/3142

KAFKA-5316: LogCleaner should account for larger record sets after cleaning



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-5316

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3142


commit e8981baa183444870f8ab1d6634e4e3f634371c2
Author: Jason Gustafson 
Date:   2017-05-24T23:41:49Z

KAFKA-5316: LogCleaner should account for larger record sets after cleaning




> Log cleaning can increase message size and cause cleaner to crash with buffer 
> overflow
> --
>
> Key: KAFKA-5316
> URL: https://issues.apache.org/jira/browse/KAFKA-5316
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> We have observed in practice that it is possible for a compressed record set 
> to grow after cleaning. Since the size of the cleaner's input and output 
> buffers are identical, this can lead to overflow of the output buffer:
> {code}
> [2017-05-23 15:05:15,480] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.nio.BufferOverflowException
> at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
> at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.java:104)
> at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:163)
> at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:114)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-05-23 15:05:15,481] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> {code}
> It is also then possible for a compressed message set to grow beyond the max 
> message size. Due to the changes in KIP-74 to alter fetch semantics, the 
> suggestion for this case is to allow the recompressed message set to exceed 
> the max message size. This should be rare in practice and won't prevent 
> consumers from making progress.
> To handle the overflow issue, one option is to allocate a temporary buffer 
> when filtering in {{MemoryRecords.filterTo}} and return it in the result. As 
> an optimization, we can resort to this only when there is a single 
> recompressed message set which is larger than the entire write buffer. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2017-05-24 Thread Kyle Winkelman
Yea I really like that idea I'll see what I can do to update the kip and my
pr when I have some time. I'm not sure how well creating the
kstreamaggregates will go though because at that point I will have thrown
away the type of the values. It will be type safe I just may need to do a
little forcing.

Thanks,
Kyle

On May 24, 2017 3:28 PM, "Guozhang Wang"  wrote:

> Kyle,
>
> Thanks for the explanations, my previous read on the wiki examples was
> wrong.
>
> So I guess my motivation should be "reduced" to: can we move the window
> specs param from "KGroupedStream#cogroup(..)" to
> "CogroupedKStream#aggregate(..)", and my motivations are:
>
> 1. minor: we can reduce the #.generics in CogroupedKStream from 3 to 2.
> 2. major: this is for extensibility of the APIs, and since we are removing
> the "Evolving" annotations on Streams it may be harder to change it again
> in the future. The extended use cases are that people wanted to have
> windowed running aggregates on different granularities, e.g. "give me the
> counts per-minute, per-hour, per-day and per-week", and today in DSL we
> need to specify that case in multiple aggregate operators, which gets a
> state store / changelog, etc. And it is possible to optimize it as well to
> a single state store. Its implementation would be tricky as you need to
> contain different lengthed windows within your window store but just from
> the public API point of view, it could be specified as:
>
> CogroupedKStream stream = stream1.cogroup(stream2, ... "state-store-name");
>
> table1 = stream.aggregate(/*per-minute window*/)
> table2 = stream.aggregate(/*per-hour window*/)
> table3 = stream.aggregate(/*per-day window*/)
>
> while underlying we are only using a single store "state-store-name" for
> it.
>
>
> Although this feature is out of the scope of this KIP, I'd like to discuss
> if we can "leave the door open" to make such changes without modifying the
> public APIs .
>
> Guozhang
>
>
> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman 
> wrote:
>
> > I allow defining a single window/sessionwindow one time when you make the
> > cogroup call from a KGroupedStream. From then on you are using the
> cogroup
> > call from with in CogroupedKStream which doesnt accept any additional
> > windows/sessionwindows.
> >
> > Is this what you meant by your question or did I misunderstand?
> >
> > On May 23, 2017 9:33 PM, "Guozhang Wang"  wrote:
> >
> > Another question that came to me is on "window alignment": from the KIP
> it
> > seems you are allowing users to specify a (potentially different) window
> > spec in each co-grouped input stream. So if these window specs are
> > different how should we "align" them with different input streams? I
> think
> > it is more natural to only specify on window spec in the
> >
> > KTable CogroupedKStream#aggregate(Windows);
> >
> >
> > And remove it from the cogroup() functions. WDYT?
> >
> >
> > Guozhang
> >
> > On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang 
> wrote:
> >
> > > Thanks for the proposal Kyle, this is a quite common use case to
> support
> > > such multi-way table join (i.e. N source tables with N aggregate func)
> > with
> > > a single store and N+1 serdes, I have seen lots of people using the
> > > low-level PAPI to achieve this goal.
> > >
> > >
> > > On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> > winkelman.k...@gmail.com
> > > > wrote:
> > >
> > >> I like your point about not handling other cases such as count and
> > reduce.
> > >>
> > >> I think that reduce may not make sense because reduce assumes that the
> > >> input values are the same as the output values. With cogroup there may
> > be
> > >> multiple different input types and then your output type cant be
> > multiple
> > >> different things. In the case where you have all matching value types
> > you
> > >> can do KStreamBuilder#merge followed by the reduce.
> > >>
> > >> As for count I think it is possible to call count on all the
> individual
> > >> grouped streams and then do joins. Otherwise we could maybe make a
> > special
> > >> call in groupedstream for this case. Because in this case we dont need
> > to
> > >> do type checking on the values. It could be similar to the current
> count
> > >> methods but accept a var args of additonal grouped streams as well and
> > >> make
> > >> sure they have a key type of K.
> > >>
> > >> The way I have put the kip together is to ensure that we do type
> > checking.
> > >> I don't see a way we could group them all first and then make a call
> to
> > >> count, reduce, or aggregate because with aggregate they would need to
> > pass
> > >> a list of aggregators and we would have no way of type checking that
> > they
> > >> match the grouped streams.
> > >>
> > >> Thanks,
> > >> Kyle
> > >>
> > >> On May 19, 2017 11:42 AM, "Xavier Léauté" 
> wrote:
> > >>
> > >> > Sorry to jump on this thread so late. I agree this is a very useful
> > >> > addition and wanted to provide an additional use-case and some more
> > >> > com

[GitHub] kafka pull request #3109: KAFKA-4144: Allow per stream/table timestamp extra...

2017-05-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3109


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16024036#comment-16024036
 ] 

ASF GitHub Bot commented on KAFKA-4144:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3109


> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, kip
> Fix For: 0.11.0.0
>
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Ma Tianchi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16024047#comment-16024047
 ] 

Ma Tianchi commented on KAFKA-5319:
---

[~mih...@wp.pl]you are right,I am a green hand to use JIRA.I reopen it

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5211) KafkaConsumer should not skip a corrupted record after throwing an exception.

2017-05-24 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16024066#comment-16024066
 ] 

Jason Gustafson commented on KAFKA-5211:


[~enothereska] Looking toward the future, it would be good to get your take on 
what the ideal behavior would be for Kafka streams. Thanks to some of the 
improvements in this release cycle, the current "intended" semantics may now 
actually be usable (which wasn't the case before--as is apparent from the 
discussion in KAFKA-4740). When we raise an exception from parsing or record 
corruption, the consumer's position should now be pointing to the offset of the 
record that failed parsing, which means the user can seek past it if they wish. 
Because of the reasons mentioned by Becket in the description, I don't think it 
will be safe to proactively skip past a corrupt record--at least not without 
refetching the data--but it may be possible for parsing errors. It has also 
been proposed to move parsing out of the consumer entirely (see KAFKA-1895), 
which would neatly solve part of the problem, but would likely require a KIP. 
Given the short horizon for this release, however, Becket's patch is probably 
the way to go for now.

> KafkaConsumer should not skip a corrupted record after throwing an exception.
> -
>
> Key: KAFKA-5211
> URL: https://issues.apache.org/jira/browse/KAFKA-5211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>  Labels: clients, consumer
> Fix For: 0.11.0.0
>
>
> In 0.10.2, when there is a corrupted record, KafkaConsumer.poll() will throw 
> an exception and block on that corrupted record. In the latest trunk this 
> behavior has changed to skip the corrupted record (which is the old consumer 
> behavior). With KIP-98, skipping corrupted messages would be a little 
> dangerous as the message could be a control message for a transaction. We 
> should fix the issue to let the KafkaConsumer block on the corrupted messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5211) KafkaConsumer should not skip a corrupted record after throwing an exception.

2017-05-24 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16024066#comment-16024066
 ] 

Jason Gustafson edited comment on KAFKA-5211 at 5/25/17 2:21 AM:
-

[~enothereska] Looking toward the future, it would be good to get your take on 
what the ideal behavior would be for Kafka streams. Thanks to some of the 
improvements in this release cycle, the current "intended" semantics may now 
actually be usable (which wasn't the case before-as is apparent from the 
discussion in KAFKA-4740). When we raise an exception from parsing or record 
corruption, the consumer's position should now be pointing to the offset of the 
record that failed parsing, which means the user can seek past it if they wish. 
Because of the reasons mentioned by Becket in the description, I don't think it 
will be safe to proactively skip past a corrupt record-at least not without 
refetching the data--but it may be possible for parsing errors. It has also 
been proposed to move parsing out of the consumer entirely (see KAFKA-1895), 
which would neatly solve part of the problem, but would likely require a KIP. 
Given the short horizon for this release, however, Becket's patch is probably 
the way to go for now.


was (Author: hachikuji):
[~enothereska] Looking toward the future, it would be good to get your take on 
what the ideal behavior would be for Kafka streams. Thanks to some of the 
improvements in this release cycle, the current "intended" semantics may now 
actually be usable (which wasn't the case before--as is apparent from the 
discussion in KAFKA-4740). When we raise an exception from parsing or record 
corruption, the consumer's position should now be pointing to the offset of the 
record that failed parsing, which means the user can seek past it if they wish. 
Because of the reasons mentioned by Becket in the description, I don't think it 
will be safe to proactively skip past a corrupt record--at least not without 
refetching the data--but it may be possible for parsing errors. It has also 
been proposed to move parsing out of the consumer entirely (see KAFKA-1895), 
which would neatly solve part of the problem, but would likely require a KIP. 
Given the short horizon for this release, however, Becket's patch is probably 
the way to go for now.

> KafkaConsumer should not skip a corrupted record after throwing an exception.
> -
>
> Key: KAFKA-5211
> URL: https://issues.apache.org/jira/browse/KAFKA-5211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>  Labels: clients, consumer
> Fix For: 0.11.0.0
>
>
> In 0.10.2, when there is a corrupted record, KafkaConsumer.poll() will throw 
> an exception and block on that corrupted record. In the latest trunk this 
> behavior has changed to skip the corrupted record (which is the old consumer 
> behavior). With KIP-98, skipping corrupted messages would be a little 
> dangerous as the message could be a control message for a transaction. We 
> should fix the issue to let the KafkaConsumer block on the corrupted messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4562) deadlock heartbeat, metadata-manager, request-handler

2017-05-24 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-4562.

Resolution: Duplicate

Pretty sure this was fixed in KAFKA-3994.

> deadlock heartbeat, metadata-manager, request-handler
> -
>
> Key: KAFKA-4562
> URL: https://issues.apache.org/jira/browse/KAFKA-4562
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: rhel7, java 1.8.0_77, vmware, two broker setup
>Reporter: Hans Kowallik
>
> Found one Java-level deadlock:
> =
> "executor-Heartbeat":
>   waiting to lock monitor 0x7f8c9c954378 (object 0x0006cd17dd18, a 
> kafka.coordinator.GroupMetadata),
>   which is held by "group-metadata-manager-0"
> "group-metadata-manager-0":
>   waiting to lock monitor 0x7f8d60002bc8 (object 0x0006d9e386e8, a 
> java.util.LinkedList),
>   which is held by "kafka-request-handler-1"
> "kafka-request-handler-1":
>   waiting to lock monitor 0x7f8c9c954378 (object 0x0006cd17dd18, a 
> kafka.coordinator.GroupMetadata),
>   which is held by "group-metadata-manager-0"
> When this happens, RAM Usage, network connections and threads increase 
> linearly.
> controller can't talk to local broker:
> [2016-12-19 16:22:44,639] INFO 
> [Controller-614897-to-broker-614897-send-thread], Controller 614897 connected 
> to kafka-dev-614897.lhotse.ov.otto.de:9092 (id: 614897 rack: null) for sending
>  state change requests (kafka.controller.RequestSendThread)
> replication thread can't talk to remote broker:
> [2016-12-19 16:22:42,014] WARN [ReplicaFetcherThread-0-614897], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@6cae17f6 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 614897 was disconnected before the 
> response was read
> Not failover happens until machine runs out of swap space or kafka is 
> restarted manually.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3133: MINOR: GroupCoordinator can append with group lock

2017-05-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3133


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Ma Tianchi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ma Tianchi updated KAFKA-5319:
--
Issue Type: New Feature  (was: Improvement)

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5323) AdminUtils.createTopic should check topic existence upfront

2017-05-24 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16024223#comment-16024223
 ] 

Onur Karaman commented on KAFKA-5323:
-

This can have a larger impact than one might initially suspect. For instance, a 
broker only populates its MetadataCache after it has joined the cluster and the 
controller sends it an UpdateMetadataRequest. But a broker can begin processing 
requests even before registering itself in zookeeper (before the controller 
even knows the broker is alive). In other words, a broker can begin processing 
MetadataRequests before processing the controller's UpdateMetadataRequest 
following broker registration.

Processing these MetadataRequests in this scenario leads to large local times 
and can cause substantial request queue backup, causing significant delays in 
the broker processing its initial UpdateMetadataRequest. Since the broker 
hasn't received any UpdateMetadataRequest from the controller yet, its 
MetadataCache is empty. So the topics from all the client MetadataRequests are 
treated as brand new topics, which means the broker tries to auto create these 
topics. For each pre-existing topic queried in the MetadataRequest, auto topic 
creation performs the N+2 zookeeper reads mentioned earlier.

In one bad production scenario (while recovering from KAFKA-4959), this caused 
a significant delay in bringing replicas online, as both the initial 
LeaderAndIsrRequest and UpdateMetadataRequest from the controller on broker 
startup was stuck behind these client MetadataRequests hammering zookeeper.

> AdminUtils.createTopic should check topic existence upfront
> ---
>
> Key: KAFKA-5323
> URL: https://issues.apache.org/jira/browse/KAFKA-5323
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> When a topic exists, AdminUtils.createTopic unnecessarily does N+2 zookeeper 
> reads where N is the number of brokers. Here is the breakdown of the N+2 
> zookeeper reads:
> # reads the current list of brokers in zookeeper (1 zookeeper read)
> # reads metadata for each broker in zookeeper (N zookeeper reads where N is 
> the number of brokers)
> # checks for topic existence in zookeeper (1 zookeeper read)
> We can reduce the N+2 reads down to 1 by checking topic existence upfront.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Ma Tianchi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ma Tianchi updated KAFKA-5319:
--
Attachment: ArithmeticDescription.png

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
> Attachments: ArithmeticDescription.png
>
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk8 #1589

2017-05-24 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: GroupCoordinator can append with group lock

--
[...truncated 2.47 MB...]
org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryWithoutPasswordConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryWithoutPasswordConfiguration PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration PASSED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse STARTED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameOverride STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameOverride PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingOptionValue 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingOptionValue PASSED

org.apache.kafka.common.security.JaasContextTest > testSingleOption STARTED

org.apache.kafka.common.security.JaasContextTest > testSingleOption PASSED

org.apache.kafka.common.security.JaasContextTest > 
testNumericOptionWithoutQuotes STARTED

org.apache.kafka.common.security.JaasContextTest > 
testNumericOptionWithoutQuotes PASSED

org.apache.kafka.common.security.JaasContextTest > testConfigNoOptions STARTED

org.apache.kafka.common.security.JaasContextTest > testConfigNoOptions PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithWrongListenerName STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithWrongListenerName PASSED

org.apache.kafka.common.security.JaasContextTest > testNumericOptionWithQuotes 
STARTED

org.apache.kafka.common.security.JaasContextTest > testNumericOptionWithQuotes 
PASSED

org.apache.kafka.common.security.JaasContextTest > testQuotedOptionValue STARTED

org.apache.kafka.common.security.JaasContextTest > testQuotedOptionValue PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingLoginModule 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingLoginModule PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingSemicolon STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingSemicolon PASSED

org.apache.kafka.common.security.JaasContextTest > testMultipleOptions STARTED

org.apache.kafka.common.security.JaasContextTest > testMultipleOptions PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForClientWithListenerName STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForClientWithListenerName PASSED

org.apache.kafka.common.security.JaasContextTest > testMultipleLoginModules 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMultipleLoginModules 
PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingControlFlag 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingControlFlag PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameAndFallback STARTED

org.apache.kafka.common

[jira] [Commented] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread LiangHong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16024279#comment-16024279
 ] 

LiangHong commented on KAFKA-5319:
--

Attachment 不错

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
> Attachments: ArithmeticDescription.png
>
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)