[jira] [Created] (KAFKA-8023) Improve global state store restoration by using multiple update threads

2019-03-01 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-8023:
-

 Summary: Improve global state store restoration by using multiple 
update threads
 Key: KAFKA-8023
 URL: https://issues.apache.org/jira/browse/KAFKA-8023
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Patrik Kleindl


Currently global state stores are restored sequentially and the partitions of 
each global state store are restored sequentially too.

Loop over stores:

https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L155

Loop over partitions:

https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L256

It would be a great improvement if one or both of those loops could be 
processed in parallel.

Possible related task is https://issues.apache.org/jira/browse/KAFKA-6721 

Mail discussion: 
https://lists.apache.org/thread.html/6fc4772eb8635c04b0ee6682003a99a5ef37ebccffea6c89752e96b1@%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8024) UtilsTest.testFormatBytes fails with german locale

2019-03-01 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-8024:
-

 Summary: UtilsTest.testFormatBytes fails with german locale
 Key: KAFKA-8024
 URL: https://issues.apache.org/jira/browse/KAFKA-8024
 Project: Kafka
  Issue Type: Bug
Reporter: Patrik Kleindl


The unit test fails when the default locale is not English (in my case, deAT)

assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024)));

 
org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED
    org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB>
        at org.junit.Assert.assertEquals(Assert.java:115)
        at org.junit.Assert.assertEquals(Assert.java:144)
        at 
org.apache.kafka.common.utils.UtilsTest.testFormatBytes(UtilsTest.java:106)
 

The easiest fix in this case should be adding
{code:java}
jvmArgs '-Duser.language=en -Duser.country=US'{code}
to the test configuration 

[https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/build.gradle#L270]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-2.2-jdk8 #40

2019-03-01 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-8012; Ensure partitionStates have not been removed before

--
[...truncated 2.69 MB...]
kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testControlledShutdownPartitionLeaderElectionAllIsrSimultaneouslyShutdown 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testControlledShutdownPartitionLeaderElectionAllIsrSimultaneouslyShutdown PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled 
PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testPreferredReplicaPartitionLeaderElectionPreferredReplicaNotInIsrNotLive 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testPreferredReplicaPartitionLeaderElectionPreferredReplicaNotInIsrNotLive 
PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionDisabled 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionDisabled 
PASSED

kafka.controller.PartitionStateMachineTest > 
testNonexistentPartitionToNewPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNonexistentPartitionToNewPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates PASSED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToNonexistentPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToNonexistentPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOfflineTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOfflineTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOfflinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOfflinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > testUpdatingOfflinePartitionsCount 
STARTED

kafka.controller.PartitionStateMachineTest > testUpdatingOfflinePartitionsCount 
PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOnlinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOnlinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOfflinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOfflinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates 
STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates 
PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNewPartitionToNonexistentPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNewPartitionToNonexistentPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidOnlinePartitionToNewPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidOnlinePartitionToNewPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testUpdatingOfflinePartitionsCountDuringTopicDeletion STARTED

kafka.controller.PartitionStateMachineTest > 
testUpdatingOfflinePartitionsCountDuringTopicDeletion PASSED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup STARTED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransitionForControlledShutdown STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransitionForControlledShutdown PASSED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup 
STARTED

kafka.controlle

Jenkins build is back to normal : kafka-2.1-jdk8 #139

2019-03-01 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-430 - Return Authorized Operations in Describe Responses

2019-03-01 Thread Manikumar
Hi all,

KIP-430 proposed an new method to the Scala `Authorizer` trait with a
default implementation
so that existing implementations continue to work. But Scala 2.11 doesn't
convert the default implementation in a
trait to a default implementation in Java. So this breaks existing Java
authorizer implementations
when building with Scala 2.11 version of core. Due to this, we are dropping
the Authorizer API related changes.

Thanks to Rajini for noticing this. Updated the KIP

.
Please let us know if any concerns.

Thanks,
Manikumar

On Thu, Feb 28, 2019 at 10:00 AM Manikumar 
wrote:

>  Hi All,
>
> While implementing KIP-430, we have added supportedOperations() method to
> kafka.security.auth.ResourceType public API.
> This will be used to maintain supported operations for a resourceType.
> Updated the KIP
> 
> with the new method details.
> Please take a note of this.
>
> Thanks,
> Manikumar
>
> On Wed, Feb 20, 2019 at 6:42 PM Rajini Sivaram 
> wrote:
>
>> If there are no other concerns or suggestions, I will start vote on this
>> KIP later today.
>>
>> Thanks,
>>
>> Rajini
>>
>> On Mon, Feb 18, 2019 at 10:09 AM Rajini Sivaram 
>> wrote:
>>
>> > Hi Magnus,
>> >
>> > Have your concerns been addressed in the KIP?
>> >
>> > Thanks,
>> >
>> > Rajini
>> >
>> > On Wed, Feb 13, 2019 at 3:33 PM Satish Duggana <
>> satish.dugg...@gmail.com>
>> > wrote:
>> >
>> >> Hi Rajini,
>> >> That makes sense, thanks for the clarification.
>> >>
>> >> Satish.
>> >>
>> >> On Wed, Feb 13, 2019 at 7:30 PM Rajini Sivaram <
>> rajinisiva...@gmail.com>
>> >> wrote:
>> >> >
>> >> > Thanks for the reviews!
>> >> >
>> >> > Hi Satish,
>> >> >
>> >> > The authorised operations returned will use the same values as the
>> >> > operations returned by the existing DescribeAclsResponse. AdminClient
>> >> will
>> >> > return these using the existing enum AclOperation.
>> >> >
>> >> > Hi Magnus,
>> >> >
>> >> > The MetadataResponse contains these two lines:
>> >> >
>> >> >- Metadata Response => throttle_time_ms [brokers] cluster_id
>> >> >controller_id [topic_metadata] [authorized_operations] <== ADDED
>> >> >authorized_operations
>> >> >- topic_metadata => error_code topic is_internal
>> [partition_metadata]
>> >> >[authorized_operations]  <== ADDED authorized_operations
>> >> >
>> >> > The first is for the cluster's authorized operations and the second
>> for
>> >> > each topic. Did I misunderstand your question? The full set of
>> >> operations
>> >> > for each resource type is included in the subsection `AdminClient API
>> >> > Changes`.
>> >> >
>> >> > Under `Rejected Alternatives` I have included addition of a separate
>> >> > request to get this information rather than extend an existing one.
>> The
>> >> > rationale for including all the information in one request is to
>> enable
>> >> > clients to get all relevant metadata using a single API rather than
>> >> have to
>> >> > send multiple requests, get responses and combine the two while
>> >> resource or
>> >> > ACLs may have changed in between. It seems neater to use a single API
>> >> since
>> >> > a user getting authorized operations is almost definitely going to
>> do a
>> >> > Describe first and access control for both of these is controlled
>> using
>> >> > Describe access. If we add new resource types with a corresponding
>> >> > Describe, we would simply need to add `authorized_operations` for
>> their
>> >> > Describe.
>> >> >
>> >> > Hi Manikumar,
>> >> >
>> >> > Added IdempotentWrite for Cluster, thanks for pointing that out! I
>> was
>> >> > thinking that if authorizer is not configured, we could return all
>> >> > supported operations since the user can perform all operations.
>> Added a
>> >> > note to the KIP.
>> >> >
>> >> > Regards,
>> >> >
>> >> > Rajini
>> >> >
>> >> >
>> >> >
>> >> > On Wed, Feb 13, 2019 at 11:07 AM Manikumar <
>> manikumar.re...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi,
>> >> > >
>> >> > > Thanks for the KIP.
>> >> > >
>> >> > > 1. Can't we include IdempotentWrite/ClusterResource Operations for
>> >> Cluster
>> >> > > resource.
>> >> > > 2. What will be the API behaviour when the authorizer is not
>> >> configured?. I
>> >> > > assume we return empty list.
>> >> > >
>> >> > > Thanks,
>> >> > > Manikumar
>> >> > >
>> >> > > On Wed, Feb 13, 2019 at 12:33 AM Rajini Sivaram <
>> >> rajinisiva...@gmail.com>
>> >> > > wrote:
>> >> > >
>> >> > > > Hi all,
>> >> > > >
>> >> > > > I have created a KIP to optionally request authorised operations
>> on
>> >> > > > resources when describing resources:
>> >> > > >
>> >> > > >
>> >> > > >
>> >> > >
>> >>
>> 

[jira] [Created] (KAFKA-8025) Flaky Test RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls

2019-03-01 Thread Konstantine Karantasis (JIRA)
Konstantine Karantasis created KAFKA-8025:
-

 Summary: Flaky Test 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls
 Key: KAFKA-8025
 URL: https://issues.apache.org/jira/browse/KAFKA-8025
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Konstantine Karantasis
Assignee: Guozhang Wang


At least one occurence where the following unit test case failed on a jenkins 
job that didn't involve any related changes. 

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2783/consoleFull]

I have not been able to reproduce it locally on Linux. (For instance 20 
consecutive runs of this class pass all test cases)
{code:java}
14:06:13 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
 > shouldForwardAllDbOptionsCalls STARTED 14:06:14 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/streams/build/reports/testOutput/org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls.test.stdout
 14:06:14 14:06:14 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
 > shouldForwardAllDbOptionsCalls FAILED 14:06:14     java.lang.AssertionError: 
14:06:14     Expected: a string matching the pattern 'Unexpected method call 
DBOptions\.baseBackgroundCompactions((.* 14:06:14     *)*):' 14:06:14          
but: was "Unexpected method call DBOptions.baseBackgroundCompactions():\n    
DBOptions.close(): expected: 3, actual: 0" 14:06:14         at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) 14:06:14         
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) 14:06:14         
at 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.verifyDBOptionsMethodCall(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:121)
 14:06:14         at 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:101)
 14:06:14 14:06:14 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
 > shouldForwardAllColumnFamilyCalls STARTED 14:06:14 14:06:14 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
 > shouldForwardAllColumnFamilyCalls PASSED

{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8026) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted

2019-03-01 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8026:
--

 Summary: Flaky Test 
RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted
 Key: KAFKA-8026
 URL: https://issues.apache.org/jira/browse/KAFKA-8026
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Affects Versions: 1.1.1, 1.0.2
Reporter: Matthias J. Sax
 Fix For: 1.0.3, 1.1.2


{quote}java.lang.AssertionError: Condition not met within timeout 15000. Stream 
tasks not updated
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
at 
org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:215){quote}
Happend in 1.0 and 1.1 builds:
[https://builds.apache.org/blue/organizations/jenkins/kafka-1.0-jdk7/detail/kafka-1.0-jdk7/263/tests/]

and
[https://builds.apache.org/blue/organizations/jenkins/kafka-1.1-jdk7/detail/kafka-1.1-jdk7/249/tests/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-03-01 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-7918.

Resolution: Fixed

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-03-01 Thread Paul Davidson
Thanks Randall.  I like your suggestion: as you say, this would make it
possible to usefully override the default client id properties.

I'm not sure how we would handle the dead-letter queue case though - maybe
we could automatically add a "dlq-" prefix to the producer client id?

If there is agreement on this change I will update the KIP and the PR (when
I find some time).


On Thu, Feb 21, 2019 at 8:12 AM Randall Hauch  wrote:

> Hi, Paul. Thanks for the update to KIP-411 to reflect adding defaults, and
> creating/updating https://github.com/apache/kafka/pull/6097 to reflect
> this
> approach.
>
> Now that we've avoided adding a new config and have changed the default `
> client.id` to include some context, the connector name, and task number, I
> think it makes overriding the client ID via worker config `
> producer.client.id` or `consumer.client.id` properties less valuable
> because those overridden client IDs will be exactly the same for all
> connectors and tasks.
>
> One one hand, we can leave this as-is, and any users that include `
> producer.client.id` and `consumer.client.id` in their worker configs keep
> the same (sort of useless) behavior. In fact, most users would probably be
> better off by removing these worker config properties and instead relying
> upon the defaults.
>
> On the other, similar to what Ewen suggested earlier (in a different
> context), we could add support for users to optionally use
> "${connectorName}" and ${task}" in their overridden client ID property and
> have Connect replace these (if found) with the connector name and task
> number. Any existing properties that don't use these variables would behave
> as-is, but this way the users could define their own client IDs yet still
> get the benefit of uniquely identifying each of the clients. For example,
> if my worker config contained the following:
>
> producer.client.id=connect-cluster-A-${connectorName}-${task}-producer
> consumer.client.id=connect-cluster-A-${connectorName}-${task}-consumer
>
> Thoughts?
>
> Randall
>
> On Wed, Feb 20, 2019 at 3:18 PM Ryanne Dolan 
> wrote:
>
> > Thanks Paul, this is great. This will make monitoring Connect a ton
> easier.
> >
> > Ryanne
> >
> > On Wed, Feb 20, 2019 at 1:24 PM Paul Davidson
> >  wrote:
> >
> > > I have updated KIP-411 to propose changing the default client id - see:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
> > >
> > >
> > > There is also an PR ready to go here:
> > > https://github.com/apache/kafka/pull/6097
> > >
> > > On Fri, Jan 11, 2019 at 3:39 PM Paul Davidson <
> pdavid...@salesforce.com>
> > > wrote:
> > >
> > > > Hi everyone.  We seem to have agreement that the ideal approach is to
> > > > alter the default client ids. Now I'm wondering about the best
> process
> > to
> > > > proceed. Will the change in default behaviour require a new KIP,
> given
> > it
> > > > will affect existing deployments?  Would I be best to repurpose this
> > > > KIP-411, or am I best to  create a new KIP? Thanks!
> > > >
> > > > Paul
> > > >
> > > > On Tue, Jan 8, 2019 at 7:16 PM Randall Hauch 
> wrote:
> > > >
> > > >> Hi, Paul.
> > > >>
> > > >> I concur with the others, and I like the new approach that avoids a
> > new
> > > >> configuration, especially because it does not change the behavior
> for
> > > >> anyone already using `producer.client.id` and/or `
> consumer.client.id
> > `.
> > > I
> > > >> did leave a few comments on the PR. Perhaps the biggest one is
> whether
> > > the
> > > >> producer used for the sink task error reporter (for dead letter
> queue)
> > > >> should be `connector-producer-`, and whether that is
> > > >> distinct
> > > >> enough from source tasks, which will be of the form
> > > >> `connector-producer-`. Maybe it is fine. (The other
> > > >> comments were minor.)
> > > >>
> > > >> Best regards,
> > > >>
> > > >> Randall
> > > >>
> > > >> On Mon, Jan 7, 2019 at 1:19 PM Paul Davidson <
> > pdavid...@salesforce.com>
> > > >> wrote:
> > > >>
> > > >> > Thanks all. I've submitted a new PR with a possible
> implementation:
> > > >> > https://github.com/apache/kafka/pull/6097. Note I did not include
> > the
> > > >> > group
> > > >> > ID as part of the default client ID, mainly to avoid the connector
> > > name
> > > >> > appearing twice by default. As noted in the original Jira (
> > > >> > https://issues.apache.org/jira/browse/KAFKA-5061), leaving out
> the
> > > >> group
> > > >> > ID
> > > >> > could lead to naming conflicts if multiple clusters run the same
> > Kafka
> > > >> > cluster. This would probably not be a problem for many (including
> > us)
> > > as
> > > >> > metrics exporters can usually be configured to include a cluster
> ID
> > > and
> > > >> > guarantee uniqueness. Will be interested to hear your thoughts on
> > > this.
> > > >> >
> > > >> > Paul
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Mon, Jan 7, 2019 at 10:27 AM Ryanne Dola

[jira] [Created] (KAFKA-8027) Gradual decline in performance of CachingWindowStore provider when number of keys grow

2019-03-01 Thread Prashant (JIRA)
Prashant created KAFKA-8027:
---

 Summary: Gradual decline in performance of CachingWindowStore 
provider when number of keys grow
 Key: KAFKA-8027
 URL: https://issues.apache.org/jira/browse/KAFKA-8027
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
Reporter: Prashant


We observed this during a performance test of our stream application which 
tracks user's activity and provides REST interface to query the window state 
store.  We used default configuration of Materialized i.e. withCachingEnabled 
for storing user behaviour stats in a window state store (CompositeWindowStore 
with CachingWindowStore as underlyin which internally uses RocksDBStore for 
persistent).  

While querying window store with store.fetch(key, long, long), it internally 
tries to fetch the range from ThreadCache which uses a byte iterator to search 
for a key in cache and on a cache miss it goes to RocksDBStore for result. So, 
when number of keys in cache becomes large this ThreadCache search starts 
taking time (range Iterator on all keys) which impacts WindowStore query 
performance.

Workaround: If we disable cache with switch on Materialized instance i.e. 
withCachingDisabled, key search is delegated directly to RocksDBStore which is 
way faster and completed search in microseconds against millis in case of 
CachingWindowStore.  

Stats: With Unique users > 0.5M, random search for a key i.e. UserId:

 

withCachingEnabled :  40 < t < 80ms (upper bound increases as unique users 
grow) withCahingDisabled: t < 1ms (Almost constant time)      



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-trunk-jdk11 #330

2019-03-01 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-03-01 Thread Colin McCabe
On Wed, Feb 27, 2019, at 15:53, Harsha wrote:
> HI Colin,
> Overlooked the IDEMPOTENT_WRITE ACL. This along with 
> client.min.version should solve the cases proposed in the KIP.
> Can we turn this KIP into adding min.client.version config to broker 
> and it could be part of the dynamic config .

+1, sounds like a good idea.

Colin


> 
> Thanks,
> Harsha
> 
> On Wed, Feb 27, 2019, at 12:17 PM, Colin McCabe wrote:
> > On Tue, Feb 26, 2019, at 16:33, Harsha wrote:
> > > Hi Colin,
> > >   
> > > "> I think Ismael and Gwen here bring up a good point.  The version of 
> > > the 
> > > > request is a technical detail that isn't really related to 
> > > > authorization.  There are a lot of other technical details like this 
> > > > like the size of the request, the protocol it came in on, etc.  None of 
> > > > them are passed to the authorizer-- they all have configuration knobs 
> > > > to control how we handle them.  If we add this technical detail, 
> > > > logically we'll have to start adding all the others, and the authorizer 
> > > > API will get really bloated.  It's better to keep it focused on 
> > > > authorization, I think."
> > > 
> > > probably my previous email is not clear but I am agreeing with Gwen's 
> > > point. 
> > > I am not in favor of extending authorizer to support this.
> > > 
> > > 
> > > "> Another thing to consider is that if we add a new broker configuration 
> > > > that lets us set a minimum client version which is allowed, that could 
> > > > be useful to other users as well.  On the other hand, most users are 
> > > > not likely to write a custom authorizer to try to take advantage of 
> > > > version information being passed to the authorizer.  So, I think using> 
> > > > a configuration is clearly the better way to go here.  Perhaps it can 
> > > > be a KIP-226 dynamic configuration to make this easier to deploy?"
> > > 
> > > Although minimum client version might help to a certain extent there 
> > > are other cases where we want users to not start using transactions for 
> > > example. My proposal in the previous thread was to introduce another 
> > > module/interface, let's say
> > > "SupportedAPIs" which will take in dynamic configuration to check which 
> > > APIs are allowed. 
> > > It can throw UnsupportedException just like we are throwing 
> > > Authorization Exception.
> > 
> > Hi Harsha,
> > 
> > We can already prevent people from using transactions using ACLs, 
> > right?  That's what the IDEMPOTENT_WRITE ACL was added for.
> > 
> > In general, I think users should be able to think of ACLs in terms of 
> > "what can I do" rather than "how is it implemented."  For example, 
> > maybe some day we will replace FetchRequest with GetStuffRequest.  But 
> > users who have READ permission on a topic shouldn't have to change 
> > anything.  So I think the Authorizer interface should not be aware of 
> > individual RPC types or message versions.
> > 
> > best,
> > Colin
> > 
> > 
> > > 
> > > 
> > > Thanks,
> > > Harsha
> > > 
> > > 
> > > n Tue, Feb 26, 2019, at 10:04 AM, Colin McCabe wrote:
> > > > Hi Harsha,
> > > > 
> > > > I think Ismael and Gwen here bring up a good point.  The version of the 
> > > > request is a technical detail that isn't really related to 
> > > > authorization.  There are a lot of other technical details like this 
> > > > like the size of the request, the protocol it came in on, etc.  None of 
> > > > them are passed to the authorizer-- they all have configuration knobs 
> > > > to control how we handle them.  If we add this technical detail, 
> > > > logically we'll have to start adding all the others, and the authorizer 
> > > > API will get really bloated.  It's better to keep it focused on 
> > > > authorization, I think.
> > > > 
> > > > Another thing to consider is that if we add a new broker configuration 
> > > > that lets us set a minimum client version which is allowed, that could 
> > > > be useful to other users as well.  On the other hand, most users are 
> > > > not likely to write a custom authorizer to try to take advantage of 
> > > > version information being passed to the authorizer.  So, I think  using 
> > > > a configuration is clearly the better way to go here.  Perhaps it can 
> > > > be a KIP-226 dynamic configuration to make this easier to deploy?
> > > > 
> > > > cheers,
> > > > Colin
> > > > 
> > > > 
> > > > On Mon, Feb 25, 2019, at 15:43, Harsha wrote:
> > > > > Hi Ying,
> > > > > I think the question is can we add a module in the core which 
> > > > > can take up the dynamic config and does a block certain APIs.  This 
> > > > > module will be called in each of the APIs like the authorizer does 
> > > > > today to check if the API is supported for the client. 
> > > > > Instead of throwing AuthorizationException like the authorizer does 
> > > > > today it can throw UnsupportedException.
> > > > > Benefits are,  we are keeping the authorizer interface as is and 
> > > > > adding 
> > > > >

Re: [DISCUSS] KIP-426: Persist Broker Id to Zookeeper

2019-03-01 Thread Colin McCabe
On Wed, Feb 27, 2019, at 14:12, Harsha wrote:
> Hi Colin,
>   What we want to is to preserve the broker.id so that we 
> can do an offline rebuild of a broker. In our cases going through 
> online Kafka replication to bring up, a failed node will put producer 
> latencies at risk given the new broker will put all the other leaders 
> busy with its replication requests. For an offline rebuild, we do not 
> need to do rebalance as long as we can recover the broker.id
>   Overall, irrespective of this use case we still want an 
> ability to retrieve a broker.id for an existing host. This will make 
> swapping in new hosts with failed hosts by keeping the existing 
> hostname easier.

Thanks for the explanation.  Shouldn't this should be handled by the cluster 
management tool, though?  Kafka doesn't include a mechanism for re-creating 
nodes that failed.  That's up to kubernetes, or ansible, or whatever cluster 
provisioning framework you have in place.  This feels like the same kind of 
thing: managing how the cluster is provisioned.

best,
Colin

> 
> Thanks,
> Harsha
> On Wed, Feb 27, 2019, at 11:53 AM, Colin McCabe wrote:
> > Hi Li,
> > 
> >  > The mechanism simplifies deployment because the same configuration can 
> > be 
> >  > used across all brokers, however, in a large system where disk failure 
> > is 
> >  > a norm, the meta file could often get lost, causing a new broker id 
> > being 
> >  > allocated. This is problematic because new broker id has no partition 
> >  > assigned to it so it can’t do anything, while partitions assigned to the 
> >  > old one lose one replica
> > 
> > If all of the disks have failed, then the partitions will lose their 
> > replicas no matter what, right?  If any of the disks is still around, 
> > then there will be a meta file on the disk which contains the previous 
> > broker ID.  So I'm not sure that we need to change anything here.
> > 
> > best,
> > Colin
> > 
> > 
> > On Tue, Feb 5, 2019, at 14:38, Li Kan wrote:
> > > Hi, I have KIP-426, which is a small change on automatically determining
> > > broker id when starting up. I am new to Kafka so there are a bunch of
> > > design trade-offs that I might be missing or hard to decide, so I'd like 
> > > to
> > > get some suggestions on it. I'd expect (and open) to modify (or even
> > > totally rewrite) the KIP based on suggestions. Thanks.
> > > 
> > > -- 
> > > Best,
> > > Kan
> > >
> >
>


[jira] [Resolved] (KAFKA-7912) In-memory key-value store does not support concurrent access

2019-03-01 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-7912.

Resolution: Fixed

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2019-03-01 Thread George Li
 Hi Jun,
Could you help review KIP-236 when you have time?  Thanks.


Hi Becket, 
Since you filed https://issues.apache.org/jira/browse/KAFKA-6304 to request 
this feature.  Could you also help review and comment on KIP-236 ?  Thanks. 



Hi Viktor,
I have updated https://github.com/apache/kafka/pull/6296  and also  KIP-236:  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment
 Please see below new section "Skip Reassignment Cancellation Scenarios": 

Skip Reassignment Cancellation Scenarios

There are a couple scenarios that the Pending reassignments in 
/admin/reassign_partitions can not be cancelled / rollback.   
   
   - If the "original_replicas"  is missing for the topic/partition in 
/admin/reassign_partitions .  In this case, the pending reassignment cancelled 
will be skipped.  Because there is no way to reset to the original replicas.  
The reasons this can happened  could be:   
  - if either the user/client is tampering /admin/reassign_partitions 
directly, and does not have the "original_replicas" for the topic
  - if the user/client is using incorrect versions of the admin client to 
submit for reassignments.   The Kafka software should be upgraded not just for 
all the brokers in the cluster.  but also on the host that is used to submit 
reassignments.   
  


   - If all the "original_replicas" brokers are not in ISR,  and some brokers 
in the "new_replicas" are not offline for the topic/partition in the pending 
reassignments.   In this case, it's better to skip this topic's pending 
reassignment  cancellation/rollback,  otherwise, it will become offline.  
However,  if all the brokers in "original_replicas" are offline  AND  all the 
brokers in "new_replicas" are also offline for this topic/partition,  then the 
cluster is in such a bad state, the topic/partition is currently offline 
anyway,  it will cancel/rollback this topic pending reassignments back to the 
"original_replicas".     


   


What other scenarios others can think of that reassignment cancellation should 
be skipped?  Thanks


Hi All,

Another issues I would like to raise is the removing of throttle for the 
Cancelled Reassignments.  Currently the remove throttle code is in the Admin 
Client.  Since the pending reassignments are cancelled /rollback,  the throttle 
would not be removed by running admin client with --verify option to remove the 
throttle.  My approached is to remove the throttle in the admin client after 
the reassignments cancellation.  But I feel it's better to move this in 
Controller (in controller failover scenario). 


Thanks,
George



On Monday, February 25, 2019, 11:40:08 AM PST, George Li 
 wrote:  
 
  Hi Viktor, 
Thanks for the response.  Good questions!  answers below: 
> A few questions regarding the rollback algorithm:> 1. At step 2 how do you 
> elect the leader? 

The step 2 code is in Enable pending reassignments cancellation/rollback by 
sql888 · Pull Request #6296 · apache/kafka  
core/src/main/scala/kafka/controller/KafkaController.scala line#622 

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Enable pending reassignments cancellation/rollback by sql888 · Pull Requ...

Enable pending reassignments (reassignments still in-flight in 
/admin/reassign_partitions) cancellation/rollback...
 |

 |

 |



rollbackReassignedPartitionLeaderIfRequired(topicPartition, 
reassignedPartitionContext)During "pending" reassignment, e.g.  (1,2,3) => 
(4,2,5)  normally, the leader (in this case broker_id 1) will remain as the 
leader until all replicas (1,2,3,4,5) in ISR, then the leader will be switched 
to 4.  However, in one scenario, if let's say new replica 4 is already caught 
up in ISR, and somehow original leader 1 is down or bounced.  4 could become 
the new leader. rollbackReassignedPartitionLeaderIfRequired() will do a 
leadership election using PreferredReplicaPartitionLeaderElectionStrategy 
  among brokers in OAR (Original Assigned Replicas set in memory). > 1.1. Would 
it be always the original leader? 
Not necessarily,  if the original preferred leader is down, it can be other 
brokers in OAR which are in ISR.
> 1.2. What if some brokers that are in OAR are down?
If some brokers in OAR are down, the topic/partition will have URP (Under 
Replicated Partition). The client deciding to do reassignment should be clear 
what the current state of the cluster is, what brokers are down, what are up, 
what reassignment is trying to accomplish. e.g. reassignment from down brokers 
to new brokers(?) 

> 2. I still have doubts that we need to do the reassignment backwards during 
> rollback. For instance if we decide to cancel the reassignment at step > #8 
> where replicas in OAR - RAR are offline and start the rollback, then how do 
> we make a replica from OAR online again before electing a leader as described 
> in step #2 of the rollback algorithm?> 3. Does the algorithm defend against 
> crashes? Is it able to continue after a controlle

[VOTE] 2.2.0 RC1

2019-03-01 Thread Matthias J. Sax
Hello Kafka users, developers and client-developers,

This is the second candidate for release of Apache Kafka 2.2.0.

 - Added SSL support for custom principle name
 - Allow SASL connections to periodically re-authenticate
 - Improved consumer group management
   - default group.id is `null` instead of empty string
 - Add --under-min-isr option to describe topics command
 - API improvement
   - Producer: introduce close(Duration)
   - AdminClient: introduce close(Duration)
   - Kafka Streams: new flatTransform() operator in Streams DSL
   - KafkaStreams (and other classed) now implement AutoClosable to
support try-with-resource
   - New Serdes and default method implementations
 - Kafka Streams exposed internal client.id via ThreadMetadata
 - Metric improvements:  All `-min`, `-avg` and `-max` metrics will now
output `NaN` as default value

Release notes for the 2.2.0 release:
https://home.apache.org/~mjsax/kafka-2.2.0-rc1/RELEASE_NOTES.html

*** Please download, test and vote by Thursday, March 7, 9am PST

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~mjsax/kafka-2.2.0-rc1/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~mjsax/kafka-2.2.0-rc1/javadoc/

* Tag to be voted upon (off 2.2 branch) is the 2.2.0 tag:
https://github.com/apache/kafka/releases/tag/2.2.0-rc1

* Documentation:
https://kafka.apache.org/22/documentation.html

* Protocol:
https://kafka.apache.org/22/protocol.html

* Jenkins builds for the 2.2 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-2.2-jdk8/

* System tests for the 2.2 branch:
https://jenkins.confluent.io/job/system-test-kafka/job/2.2/


/**

Thanks,


-Matthias



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] 2.2.0 RC1

2019-03-01 Thread Kevin Lu
Hi Matthias,

The Confluence release page is correct, but the “ - Add --under-min-isr
option to describe topics command” was pushed back to 2.3.0.

Thanks,
Kevin

On Fri, Mar 1, 2019 at 11:48 AM Matthias J. Sax 
wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second candidate for release of Apache Kafka 2.2.0.
>
>  - Added SSL support for custom principle name
>  - Allow SASL connections to periodically re-authenticate
>  - Improved consumer group management
>- default group.id is `null` instead of empty string
>  - Add --under-min-isr option to describe topics command
>  - API improvement
>- Producer: introduce close(Duration)
>- AdminClient: introduce close(Duration)
>- Kafka Streams: new flatTransform() operator in Streams DSL
>- KafkaStreams (and other classed) now implement AutoClosable to
> support try-with-resource
>- New Serdes and default method implementations
>  - Kafka Streams exposed internal client.id via ThreadMetadata
>  - Metric improvements:  All `-min`, `-avg` and `-max` metrics will now
> output `NaN` as default value
>
> Release notes for the 2.2.0 release:
> https://home.apache.org/~mjsax/kafka-2.2.0-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Thursday, March 7, 9am PST
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~mjsax/kafka-2.2.0-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~mjsax/kafka-2.2.0-rc1/javadoc/
>
> * Tag to be voted upon (off 2.2 branch) is the 2.2.0 tag:
> https://github.com/apache/kafka/releases/tag/2.2.0-rc1
>
> * Documentation:
> https://kafka.apache.org/22/documentation.html
>
> * Protocol:
> https://kafka.apache.org/22/protocol.html
>
> * Jenkins builds for the 2.2 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-2.2-jdk8/
>
> * System tests for the 2.2 branch:
> https://jenkins.confluent.io/job/system-test-kafka/job/2.2/
>
>
> /**
>
> Thanks,
>
>
> -Matthias
>
>


Error deserializing Avro message for id 25

2019-03-01 Thread Zhou, Limin (Ray)
Hello

I am seeing the following error in the log when consume the AVRO message, try 
to google it, no help, could anyone explain what the error means in here?

Thanks
Raymond

2019-02-27 01:37:35,135 ERROR [stderr] (default task-24) 
org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition MDDisputeAcceptRequest-0 at offset 23. If needed, 
please seek past the record to continue consumption.
2019-02-27 01:37:35,137 ERROR [stderr] (default task-24) Caused by: 
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro 
message for id 25
2019-02-27 01:37:35,139 ERROR [stderr] (default task-24) Caused by: 
org.apache.kafka.common.errors.SerializationException: Could not find class 
b24_ptlf_financial_formatted specified in writer's schema whilst finding 
reader's schema for a SpecificRecord.


Re: Apache Kafka Memory Leakage???

2019-03-01 Thread Sönke Liebau
Hi Syed,

from your screenshot I assume that you are using SnapLogic to run your
code (full disclosure: I do not have the faintest idea of this
product!). I've just had a look at the docs and am a bit confused by
their explanation of the metric that you point out in your image
"Memory Allocated". The docs say: "The Memory Allocated reflects the
number of bytes that were allocated by the Snap.  Note that this
number does not reflect the amount of memory that was freed and it is
not the peak memory usage of the Snap.  So, it is not necessarily a
metric that can be used to estimate the required size of a Snaplex
node.  Rather, the number provides an insight into how much memory had
to be allocated to process all of the documents.  For example, if the
total allocated was 5MB and the Snap processed 32 documents, then the
Snap allocated roughly 164KB per document.  When combined with the
other statistics, this number can help to identify the potential
causes of performance issues."
The part about not reflecting memory that was freed makes me somewhat
doubtful whether this actually reflects how much memory the process
currently holds.  Can you give some more insight there?

Apart from that, I just ran your code somewhat modified to make it
work without dependencies for 2 hours and saw no unusual memory
consumption, just a regular garbage collection sawtooth pattern. That
being said, I had to replace your actual processing with a simple
println, so if there is a memory leak in there I would of course not
have noticed.
I've uploaded the code I ran [1] for reference. For further analysis,
maybe you could run something similar with just a println or noop and
see if the symptoms persist, to localize the leak (if it exists).

Also, two random observations on your code:

KafkaConsumer.poll(Long timeout) is deprecated, you should consider
using the overloaded version with a Duration parameter instead.

The comment at [2] seems to contradict the following code, as the
offsets are only changed when in suggest mode. But as I have no idea
what suggest mode even is or all this means this observation may be
miles of point :)

I hope that helps a little.

Best regards,
Sönke

[1] https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983
[2] 
https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86


On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed
 wrote:
>
>
> Thanks,
>
>
>
> -- Forwarded message -
> From: Syed Mudassir Ahmed 
> Date: Tue, Feb 26, 2019 at 12:40 PM
> Subject: Apache Kafka Memory Leakage???
> To: 
> Cc: Syed Mudassir Ahmed 
>
>
> Hi Team,
>   I have a java application based out of latest Apache Kafka version 2.1.1.
>   I have a consumer application that runs infinitely to consume messages 
> whenever produced.
>   Sometimes there are no messages produced for hours.  Still, I see that the 
> memory allocated to consumer program is drastically increasing.
>   My code is as follows:
>
> AtomicBoolean isRunning = new AtomicBoolean(true);
>
> Properties kafkaProperties = new Properties();
>
> kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
>
> kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
>
> kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, 
> UUID.randomUUID().toString());
> kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
> kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> AUTO_OFFSET_RESET_EARLIEST);
> consumer = new KafkaConsumer(kafkaProperties, 
> keyDeserializer, valueDeserializer);
> if (topics != null) {
> subscribeTopics(topics);
> }
>
>
> boolean infiniteLoop = false;
> boolean oneTimeMode = false;
> int timeout = consumeTimeout;
> if (isSuggest) {
> //Configuration for suggest mode
> oneTimeMode = true;
> msgCount = 0;
> timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
> } else if (msgCount < 0) {
> infiniteLoop = true;
> } else if (msgCount == 0) {
> oneTimeMode = true;
> }
> Map offsets = Maps.newHashMap();
> do {
> ConsumerRecords records = consumer.poll(timeout);
> for (final ConsumerRecord record : records) {
> if (!infiniteLoop && !oneTimeMode) {
> --msgCount;
> if (msgCount < 0) {
> break;
> }
> }
> outputViews.write(new BinaryOutput() {
> @Override
> public Document getHeader() {
> return generateHeader(record, oldHeader);
> }
>
> @Override
> public void write(WritableByteChannel writeChannel) 
> throws IOException {
> try (OutputStream os = 
> Channels.newOutputStream(writeChannel)) {
> os.write(record.value());
> }
>

Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-03-01 Thread Ryanne Dolan
Paul, Randall, I don't think most people will care to exercise so much
control over the client IDs, so long as they are filled in automatically in
a way that eliminates duplicate metrics and remains somewhat legible. If we
let the user specify a pattern or something, we're really just making the
user worry about these requirements.

For example, if they specify "foo" as the client.id, they'll get a bunch of
exceptions about that MBean already existing. So they'll try
"${connectorName}-foo", which won't work because connectors that get
restarted will re-use the same client ID and the same MBean again. And so
on, until they end up solving the same problem we are trying to solve here.

I think you at least need something like "connect--producer-dlq" to
avoid MBeans being re-registered within the same JVM. I believe the task ID
is based on the connector name, so you'd get e.g.
"connect-myconnector-1-producer".

Ryanne


On Fri, Mar 1, 2019 at 12:44 PM Paul Davidson
 wrote:

> Thanks Randall.  I like your suggestion: as you say, this would make it
> possible to usefully override the default client id properties.
>
> I'm not sure how we would handle the dead-letter queue case though - maybe
> we could automatically add a "dlq-" prefix to the producer client id?
>
> If there is agreement on this change I will update the KIP and the PR (when
> I find some time).
>
>
> On Thu, Feb 21, 2019 at 8:12 AM Randall Hauch  wrote:
>
> > Hi, Paul. Thanks for the update to KIP-411 to reflect adding defaults,
> and
> > creating/updating https://github.com/apache/kafka/pull/6097 to reflect
> > this
> > approach.
> >
> > Now that we've avoided adding a new config and have changed the default `
> > client.id` to include some context, the connector name, and task
> number, I
> > think it makes overriding the client ID via worker config `
> > producer.client.id` or `consumer.client.id` properties less valuable
> > because those overridden client IDs will be exactly the same for all
> > connectors and tasks.
> >
> > One one hand, we can leave this as-is, and any users that include `
> > producer.client.id` and `consumer.client.id` in their worker configs
> keep
> > the same (sort of useless) behavior. In fact, most users would probably
> be
> > better off by removing these worker config properties and instead relying
> > upon the defaults.
> >
> > On the other, similar to what Ewen suggested earlier (in a different
> > context), we could add support for users to optionally use
> > "${connectorName}" and ${task}" in their overridden client ID property
> and
> > have Connect replace these (if found) with the connector name and task
> > number. Any existing properties that don't use these variables would
> behave
> > as-is, but this way the users could define their own client IDs yet still
> > get the benefit of uniquely identifying each of the clients. For example,
> > if my worker config contained the following:
> >
> > producer.client.id
> =connect-cluster-A-${connectorName}-${task}-producer
> > consumer.client.id
> =connect-cluster-A-${connectorName}-${task}-consumer
> >
> > Thoughts?
> >
> > Randall
> >
> > On Wed, Feb 20, 2019 at 3:18 PM Ryanne Dolan 
> > wrote:
> >
> > > Thanks Paul, this is great. This will make monitoring Connect a ton
> > easier.
> > >
> > > Ryanne
> > >
> > > On Wed, Feb 20, 2019 at 1:24 PM Paul Davidson
> > >  wrote:
> > >
> > > > I have updated KIP-411 to propose changing the default client id -
> see:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
> > > >
> > > >
> > > > There is also an PR ready to go here:
> > > > https://github.com/apache/kafka/pull/6097
> > > >
> > > > On Fri, Jan 11, 2019 at 3:39 PM Paul Davidson <
> > pdavid...@salesforce.com>
> > > > wrote:
> > > >
> > > > > Hi everyone.  We seem to have agreement that the ideal approach is
> to
> > > > > alter the default client ids. Now I'm wondering about the best
> > process
> > > to
> > > > > proceed. Will the change in default behaviour require a new KIP,
> > given
> > > it
> > > > > will affect existing deployments?  Would I be best to repurpose
> this
> > > > > KIP-411, or am I best to  create a new KIP? Thanks!
> > > > >
> > > > > Paul
> > > > >
> > > > > On Tue, Jan 8, 2019 at 7:16 PM Randall Hauch 
> > wrote:
> > > > >
> > > > >> Hi, Paul.
> > > > >>
> > > > >> I concur with the others, and I like the new approach that avoids
> a
> > > new
> > > > >> configuration, especially because it does not change the behavior
> > for
> > > > >> anyone already using `producer.client.id` and/or `
> > consumer.client.id
> > > `.
> > > > I
> > > > >> did leave a few comments on the PR. Perhaps the biggest one is
> > whether
> > > > the
> > > > >> producer used for the sink task error reporter (for dead letter
> > queue)
> > > > >> should be `connector-producer-`, and whether that is
> > > > >> distinct
> > > > >> enough from source tasks, which 

[jira] [Created] (KAFKA-8028) Kafka - ACL remove

2019-03-01 Thread Sathish Yanamala (JIRA)
Sathish Yanamala created KAFKA-8028:
---

 Summary: Kafka - ACL remove 
 Key: KAFKA-8028
 URL: https://issues.apache.org/jira/browse/KAFKA-8028
 Project: Kafka
  Issue Type: Task
Reporter: Sathish Yanamala


Hi Team,

User:--consumer has Allow permission for operations: All from hosts: 
*
 
 "--consumer" added along with principle , please let me know which command to 
remove all operations , I tried using delete operation but it is showing same.

 

Thank you,

Sathish Yanamala 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7221) Add Build method to StreamsBuilder accepting Properties

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-7221:


> Add Build method to StreamsBuilder accepting Properties 
> 
>
> Key: KAFKA-7221
> URL: https://issues.apache.org/jira/browse/KAFKA-7221
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-3770) KStream job should be able to specify linger.ms

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3770:


> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Major
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7419) Rolling sum for high frequency sream

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-7419:


> Rolling sum for high frequency sream
> 
>
> Key: KAFKA-7419
> URL: https://issues.apache.org/jira/browse/KAFKA-7419
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Stanislav Bausov
>Priority: Minor
>
> Have a task to count 24h market volume for high frequency trades stream. And 
> there is no solution out of the box. Windowing is not an option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7419) Rolling sum for high frequency sream

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7419.

Resolution: Not A Problem

> Rolling sum for high frequency sream
> 
>
> Key: KAFKA-7419
> URL: https://issues.apache.org/jira/browse/KAFKA-7419
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Stanislav Bausov
>Priority: Minor
>
> Have a task to count 24h market volume for high frequency trades stream. And 
> there is no solution out of the box. Windowing is not an option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3770) KStream job should be able to specify linger.ms

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3770.

Resolution: Duplicate

> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Major
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-6592:


> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-6092) Time passed in punctuate call is currentTime, not punctuate schedule time.

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-6092:


> Time passed in punctuate call is currentTime, not punctuate schedule time. 
> ---
>
> Key: KAFKA-6092
> URL: https://issues.apache.org/jira/browse/KAFKA-6092
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Priority: Major
>
> The java doc specifies that for a Transformer, calling context.schedule calls 
> punctuate every 1000ms. This is not entirely accurate, as if no data is 
> received for a while, punctuate won't be called.
> {code}
>  * void init(ProcessorContext context) {
>  * this.context = context;
>  * this.state = context.getStateStore("myTransformState");
>  * context.schedule(1000); // call #punctuate() each 
> 1000ms
>  * }
> {code}
> When you receive new data say after 20 seconds, punctuate will play catch up 
> and will be called 20 times at reception of the new data. 
> the signature of punctuate is
> {code}
> * KeyValue punctuate(long timestamp) {
>  * // can access this.state
>  * // can emit as many new KeyValue pairs as required via 
> this.context#forward()
>  * return null; // don't return result -- can also be 
> "new KeyValue()"
>  * }
> {code}
> but the timestamp being passed is currentTimestamp at the time of the call to 
> punctuate, not at the time the punctuate was scheduled. It is very confusing 
> and I think the timestamp should represent the one at which the punctuate 
> should have been scheduled. Getting the current timestamp is not adding much 
> information as it can easily obtained using  System.currentTimeMillis();



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6092) Time passed in punctuate call is currentTime, not punctuate schedule time.

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6092.

Resolution: Not A Problem

> Time passed in punctuate call is currentTime, not punctuate schedule time. 
> ---
>
> Key: KAFKA-6092
> URL: https://issues.apache.org/jira/browse/KAFKA-6092
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Priority: Major
>
> The java doc specifies that for a Transformer, calling context.schedule calls 
> punctuate every 1000ms. This is not entirely accurate, as if no data is 
> received for a while, punctuate won't be called.
> {code}
>  * void init(ProcessorContext context) {
>  * this.context = context;
>  * this.state = context.getStateStore("myTransformState");
>  * context.schedule(1000); // call #punctuate() each 
> 1000ms
>  * }
> {code}
> When you receive new data say after 20 seconds, punctuate will play catch up 
> and will be called 20 times at reception of the new data. 
> the signature of punctuate is
> {code}
> * KeyValue punctuate(long timestamp) {
>  * // can access this.state
>  * // can emit as many new KeyValue pairs as required via 
> this.context#forward()
>  * return null; // don't return result -- can also be 
> "new KeyValue()"
>  * }
> {code}
> but the timestamp being passed is currentTimestamp at the time of the call to 
> punctuate, not at the time the punctuate was scheduled. It is very confusing 
> and I think the timestamp should represent the one at which the punctuate 
> should have been scheduled. Getting the current timestamp is not adding much 
> information as it can easily obtained using  System.currentTimeMillis();



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5825) Streams not processing when exactly once is set

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5825:


> Streams not processing when exactly once is set
> ---
>
> Key: KAFKA-5825
> URL: https://issues.apache.org/jira/browse/KAFKA-5825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: EmbeddedKafka running on Windows.  Relevant files 
> attached.
>Reporter: Ryan Worsley
>Priority: Major
> Attachments: Tests.scala, build.sbt, log-output.txt, log4j.properties
>
>
> +Set-up+
> I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] 
> for ScalaTest.
> This spins up a single broker internally on a random port.
> I've written two tests - the first without transactions, the second with.  
> They're nearly identical apart from the config and the transactional 
> semantics.  I've written the transactional version based on Neha's 
> [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/]
>  which is the closest thing I could find to instructions.
> The tests wait until a single message is processed by the streams topology, 
> they use this message to complete a promise that the test is waiting on.  
> Once the promise completes the test verifies the value of the promise as 
> being the expected value of the message.
> +Observed behaviour+
> The first test passes fine, the second test times out, the stream processor 
> never seems to read the transactional message.
> +Notes+
> I've attached my build.sbt, log4j.properties and my Tests.scala file in order 
> to make it as easy as possible for someone to re-create.  I'm running on 
> Windows and using Scala as this reflects my workplace.  I completely expect 
> there to be some configuration issue that's causing this, but am unable to 
> proceed at this time.
> Related information: 
> https://github.com/manub/scalatest-embedded-kafka/issues/82



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5825) Streams not processing when exactly once is set

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5825.

Resolution: Not A Bug

> Streams not processing when exactly once is set
> ---
>
> Key: KAFKA-5825
> URL: https://issues.apache.org/jira/browse/KAFKA-5825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: EmbeddedKafka running on Windows.  Relevant files 
> attached.
>Reporter: Ryan Worsley
>Priority: Major
> Attachments: Tests.scala, build.sbt, log-output.txt, log4j.properties
>
>
> +Set-up+
> I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] 
> for ScalaTest.
> This spins up a single broker internally on a random port.
> I've written two tests - the first without transactions, the second with.  
> They're nearly identical apart from the config and the transactional 
> semantics.  I've written the transactional version based on Neha's 
> [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/]
>  which is the closest thing I could find to instructions.
> The tests wait until a single message is processed by the streams topology, 
> they use this message to complete a promise that the test is waiting on.  
> Once the promise completes the test verifies the value of the promise as 
> being the expected value of the message.
> +Observed behaviour+
> The first test passes fine, the second test times out, the stream processor 
> never seems to read the transactional message.
> +Notes+
> I've attached my build.sbt, log4j.properties and my Tests.scala file in order 
> to make it as easy as possible for someone to re-create.  I'm running on 
> Windows and using Scala as this reflects my workplace.  I completely expect 
> there to be some configuration issue that's causing this, but am unable to 
> proceed at this time.
> Related information: 
> https://github.com/manub/scalatest-embedded-kafka/issues/82



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7221) Add Build method to StreamsBuilder accepting Properties

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7221.

Resolution: Duplicate

> Add Build method to StreamsBuilder accepting Properties 
> 
>
> Key: KAFKA-7221
> URL: https://issues.apache.org/jira/browse/KAFKA-7221
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams

2019-03-01 Thread Guozhang Wang
Hello Boyang,

I've just made a quick pass on the KIP and here are some thoughts.

Meta:

1. I'm still not sure if it's worthwhile to add a new type of "learner
task" in addition to "standby task": if the only difference is that for the
latter, we would consider workload balance while for the former we would
not, I think we can just adjust the logic of StickyTaskAssignor a bit to
break that difference. Adding a new type of task would be adding a lot of
code complexity, so if we can still piggy-back the logic on a standby-task
I would prefer to do so.

2. One thing that's still not clear from the KIP wiki itself is which layer
would the logic be implemented at. Although for most KIPs we would not
require internal implementation details but only public facing API updates,
for a KIP like this I think it still requires to flesh out details on the
implementation design. More specifically: today Streams embed a full
fledged Consumer client, which hard-code a ConsumerCoordinator inside,
Streams then injects a StreamsPartitionAssignor to its plugable
PartitionAssignor interface and inside the StreamsPartitionAssignor we also
have a TaskAssignor interface whose default implementation is
StickyPartitionAssignor. Streams partition assignor logic today sites in
the latter two classes. Hence the hierarchy today is:

KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor ->
StickyTaskAssignor.

We need to think about where the proposed implementation would take place
at, and personally I think it is not the best option to inject all of them
into the StreamsPartitionAssignor / StickyTaskAssignor since the logic of
"triggering another rebalance" etc would require some coordinator logic
which is hard to mimic at PartitionAssignor level. On the other hand, since
we are embedding a KafkaConsumer client as a whole we cannot just replace
ConsumerCoordinator with a specialized StreamsCoordinator like Connect does
in KIP-415. So I'd like to maybe split the current proposal in both
consumer layer and streams-assignor layer like we did in KIP-98/KIP-129.
And then the key thing to consider is how to cut off the boundary so that
the modifications we push to ConsumerCoordinator would be beneficial
universally for any consumers, while keep the Streams-specific logic at the
assignor level.

3. Depending on which design direction we choose, our migration plan would
also be quite different. For example, if we stay with ConsumerCoordinator
whose protocol type is "consumer" still, and we can manage to make all
changes agnostic to brokers as well as to old versioned consumers, then our
migration plan could be much easier.

4. I think one major issue related to this KIP is that today, in the
StickyPartitionAssignor, we always try to honor stickiness over workload
balance, and hence "learner task" is needed to break this priority, but I'm
wondering if we can have a better solution within sticky task assignor that
accommodate this?

Minor:

1. The idea of two rebalances have also been discussed in
https://issues.apache.org/jira/browse/KAFKA-6145. So we should add the
reference on the wiki page as well.
2. Could you also add a section describing how the subscription /
assignment metadata will be re-formatted? Without this information it is
hard to get to the bottom of your idea. For example in the "Leader Transfer
Before Scaling" section, I'm not sure why "S2 doesn't know S4 is new member"
and hence would blindly obey stickiness over workload balance requirement.

Guozhang


On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen  wrote:

> Hey community friends,
>
> I'm gladly inviting you to have a look at the proposal to add incremental
> rebalancing to Kafka Streams, A.K.A auto-scaling support.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
>
> Special thanks to Guozhang for giving great guidances and important
> feedbacks while making this KIP!
>
> Best,
> Boyang
>


-- 
-- Guozhang


[jira] [Resolved] (KAFKA-5551) StreamThread should not expose methods for testing

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5551.

Resolution: Duplicate

> StreamThread should not expose methods for testing
> --
>
> Key: KAFKA-5551
> URL: https://issues.apache.org/jira/browse/KAFKA-5551
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> {{StreamsThread}} currently exposes {{createStreamTask()}} and 
> {{createStandbyTask()}} as {{protected}} in order to inject "test tasks" in 
> unit tests. We should rework this and make both methods {{private}}. Maybe we 
> can introduce a {{TaskSupplier}} similar to {{KafkaClientSupplier}} (however, 
> {{TaskSupplier}} should not be public API and be in package {{internal}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5551) StreamThread should not expose methods for testing

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5551:


> StreamThread should not expose methods for testing
> --
>
> Key: KAFKA-5551
> URL: https://issues.apache.org/jira/browse/KAFKA-5551
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> {{StreamsThread}} currently exposes {{createStreamTask()}} and 
> {{createStandbyTask()}} as {{protected}} in order to inject "test tasks" in 
> unit tests. We should rework this and make both methods {{private}}. Maybe we 
> can introduce a {{TaskSupplier}} similar to {{KafkaClientSupplier}} (however, 
> {{TaskSupplier}} should not be public API and be in package {{internal}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7628) KafkaStream is not closing

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7628.
--
Resolution: Not A Problem

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7628) KafkaStream is not closing

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-7628:
--

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6592.

Resolution: Duplicate

> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #3429

2019-03-01 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: ConsumerNetworkClient does not need to send the remaining

[wangguoz] HOTFIX: add igore import to streams_upgrade_test

--
[...truncated 2.31 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STAR

Build failed in Jenkins: kafka-1.1-jdk7 #250

2019-03-01 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] HOTFIX: add igore import to streams_upgrade_test

--
[...truncated 1.94 MB...]
org.apache.kafka.streams.StreamsConfigTest > 
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled 
STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfNotAtLestOnceOrExactlyOnce STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfNotAtLestOnceOrExactlyOnce PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldAllowSettingConsumerIsolationLevelIfEosDisabled STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldAllowSettingConsumerIsolationLevelIfEosDisabled PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeBackwardsCompatibleWithDeprecatedConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeBackwardsCompatibleWithDeprecatedConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer PASSED

org.apache.kafka.streams.StreamsConfigTest > shouldAcceptExactlyOnce STARTED

org.apache.kafka.streams.StreamsConfigTest > shouldAcceptExactlyOnce PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldForwardCustomConfigsWithNoPrefixToAllClients STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldForwardCustomConfigsWithNoPrefixToAllClients PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectValueSerdeClassOnError STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectValueSerdeClassOnError PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldS

[jira] [Resolved] (KAFKA-4953) Global Store: cast exception when initialising with in-memory logged state store

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4953.

Resolution: Duplicate

> Global Store: cast exception when initialising with in-memory logged state 
> store
> 
>
> Key: KAFKA-4953
> URL: https://issues.apache.org/jira/browse/KAFKA-4953
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Yennick Trevels
>Priority: Major
>  Labels: user-experience
>
> Currently it is not possible to initialise a global store with an in-memory 
> *logged* store via the TopologyBuilder. This results in the following 
> exception:
> {code}
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl 
> cannot be cast to 
> org.apache.kafka.streams.processor.internals.RecordCollector$Supplier
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:52)
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:44)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
>   at 
> org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97)
>   at 
> org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)
>   at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:215)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235)
>   ...
> {code}
> I've created a PR which includes a unit this to verify this behavior.
> If the below PR gets merge, the fixing PR should leverage the provided test 
> {{ProcessorTopologyTest#shouldDriveInMemoryLoggedGlobalStore}} by removing 
> the {{@ignore}} annotation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4953) Global Store: cast exception when initialising with in-memory logged state store

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4953:


> Global Store: cast exception when initialising with in-memory logged state 
> store
> 
>
> Key: KAFKA-4953
> URL: https://issues.apache.org/jira/browse/KAFKA-4953
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Yennick Trevels
>Priority: Major
>  Labels: user-experience
>
> Currently it is not possible to initialise a global store with an in-memory 
> *logged* store via the TopologyBuilder. This results in the following 
> exception:
> {code}
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl 
> cannot be cast to 
> org.apache.kafka.streams.processor.internals.RecordCollector$Supplier
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:52)
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:44)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
>   at 
> org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97)
>   at 
> org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)
>   at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:215)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235)
>   ...
> {code}
> I've created a PR which includes a unit this to verify this behavior.
> If the below PR gets merge, the fixing PR should leverage the provided test 
> {{ProcessorTopologyTest#shouldDriveInMemoryLoggedGlobalStore}} by removing 
> the {{@ignore}} annotation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4732) Unstable test: KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion[1]

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4732.

Resolution: Duplicate

> Unstable test: KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion[1]
> -
>
> Key: KAFKA-4732
> URL: https://issues.apache.org/jira/browse/KAFKA-4732
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:259)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:221)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:190)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:295)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4721) KafkaStreams (and possibly others) should inherit Closeable

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4721:


> KafkaStreams (and possibly others) should inherit Closeable
> ---
>
> Key: KAFKA-4721
> URL: https://issues.apache.org/jira/browse/KAFKA-4721
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Priority: Major
>  Labels: needs-kip
>
> KafkaStreams should inherit AutoCloseable or Closeable so that you can use 
> try-with-resources:
> {code}
> try (KafkaStreams reader = storage.createStreams(builder)) {
> reader.start();
> stopCondition.join();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4732) Unstable test: KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion[1]

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4732:


> Unstable test: KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion[1]
> -
>
> Key: KAFKA-4732
> URL: https://issues.apache.org/jira/browse/KAFKA-4732
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:259)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:221)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:190)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:295)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5063:


> Flaky 
> ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
> -
>
> Key: KAFKA-5063
> URL: https://issues.apache.org/jira/browse/KAFKA-5063
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: unit-test
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError: 
> Expected: <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 4), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 3)]>
>  but: was <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5063.

Resolution: Cannot Reproduce

> Flaky 
> ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
> -
>
> Key: KAFKA-5063
> URL: https://issues.apache.org/jira/browse/KAFKA-5063
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: unit-test
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError: 
> Expected: <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 4), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 3)]>
>  but: was <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4186:


> Transient failure in KStreamAggregationIntegrationTest
> --
>
> Key: KAFKA-4186
> URL: https://issues.apache.org/jira/browse/KAFKA-4186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>Priority: Major
>
> Saw this running locally off of trunk:
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
> shouldGroupByKey[1] FAILED
> java.lang.AssertionError: Condition not met within timeout 6. Did not 
> receive 10 number of records
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
> {code}
> 
> Re-opening this issue with a different test, but it seems the test suite 
> itself has a timing-dependent flakiness.
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. metadata 
> for topic=output-9 partition=0 not propagated to all brokers
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMetadataIsPropagated(IntegrationTestUtils.java:254)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForTopicPartitions(IntegrationTestUtils.java:246)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:209)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopics(EmbeddedKafkaCluster.java:168)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.createTopics(KStreamAggregationIntegrationTest.java:680)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.before(KStreamAggregationIntegrationTest.java:105)
> ...
> {code}
> Example: 
> https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/10564/testReport/junit/org.apache.kafka.streams.integration/KStreamAggregationIntegrationTest/shouldAggregateWindowed/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4281) Should be able to forward aggregation values immediately

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4281:


> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Major
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4186.

Resolution: Cannot Reproduce

> Transient failure in KStreamAggregationIntegrationTest
> --
>
> Key: KAFKA-4186
> URL: https://issues.apache.org/jira/browse/KAFKA-4186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>Priority: Major
>
> Saw this running locally off of trunk:
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
> shouldGroupByKey[1] FAILED
> java.lang.AssertionError: Condition not met within timeout 6. Did not 
> receive 10 number of records
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
> {code}
> 
> Re-opening this issue with a different test, but it seems the test suite 
> itself has a timing-dependent flakiness.
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. metadata 
> for topic=output-9 partition=0 not propagated to all brokers
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMetadataIsPropagated(IntegrationTestUtils.java:254)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForTopicPartitions(IntegrationTestUtils.java:246)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:209)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopics(EmbeddedKafkaCluster.java:168)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.createTopics(KStreamAggregationIntegrationTest.java:680)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.before(KStreamAggregationIntegrationTest.java:105)
> ...
> {code}
> Example: 
> https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/10564/testReport/junit/org.apache.kafka.streams.integration/KStreamAggregationIntegrationTest/shouldAggregateWindowed/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4281) Should be able to forward aggregation values immediately

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4281.

Resolution: Duplicate

> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Major
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-3826) Sampling on throughput / latency metrics recording in Streams

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3826:


> Sampling on throughput / latency metrics recording in Streams
> -
>
> Key: KAFKA-3826
> URL: https://issues.apache.org/jira/browse/KAFKA-3826
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: architecture, performance
>
> In Kafka Streams we record throughput / latency metrics on EACH processing 
> record, causing a lot of recording overhead. Instead, we should consider 
> statistically sampling messages flowing through to measures latency and 
> throughput.
> This is based on our observations from KAFKA-3769 and KAFKA-3811.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3826) Sampling on throughput / latency metrics recording in Streams

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3826.

Resolution: Duplicate

> Sampling on throughput / latency metrics recording in Streams
> -
>
> Key: KAFKA-3826
> URL: https://issues.apache.org/jira/browse/KAFKA-3826
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: architecture, performance
>
> In Kafka Streams we record throughput / latency metrics on EACH processing 
> record, causing a lot of recording overhead. Instead, we should consider 
> statistically sampling messages flowing through to measures latency and 
> throughput.
> This is based on our observations from KAFKA-3769 and KAFKA-3811.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3884) KGroupedStreamIntegrationTest.shouldAggregate seems to be hanging

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3884.

Resolution: Duplicate

> KGroupedStreamIntegrationTest.shouldAggregate seems to be hanging
> -
>
> Key: KAFKA-3884
> URL: https://issues.apache.org/jira/browse/KAFKA-3884
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Ismael Juma
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: transient-unit-test-failure
>
> Build failed after 180 minutes and the last 2 lines were:
> {code}
> org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > 
> shouldReduce PASSED
> org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > 
> shouldAggregate STARTED
> {code}
> https://builds.apache.org/job/kafka-trunk-jdk8/712/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4849) Bug in KafkaStreams documentation

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4849.

Resolution: Duplicate  (was: Fixed)

> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4721) KafkaStreams (and possibly others) should inherit Closeable

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4721.

Resolution: Duplicate

> KafkaStreams (and possibly others) should inherit Closeable
> ---
>
> Key: KAFKA-4721
> URL: https://issues.apache.org/jira/browse/KAFKA-4721
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Priority: Major
>  Labels: needs-kip
>
> KafkaStreams should inherit AutoCloseable or Closeable so that you can use 
> try-with-resources:
> {code}
> try (KafkaStreams reader = storage.createStreams(builder)) {
> reader.start();
> stopCondition.join();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5122) Kafka Streams unexpected off-heap memory growth

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-5122:
--

> Kafka Streams unexpected off-heap memory growth
> ---
>
> Key: KAFKA-5122
> URL: https://issues.apache.org/jira/browse/KAFKA-5122
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux 64-bit
> Oracle JVM version "1.8.0_121"
>Reporter: Jon Buffington
>Assignee: Guozhang Wang
>Priority: Minor
>
> I have a Kafka Streams application that leaks off-heap memory at a rate of 
> 20MB per commit interval. The application is configured with a 1G heap; the 
> heap memory does not show signs of leaking. The application reaches 16g of 
> system memory usage before terminating and restarting.
> Application facts:
> * The data pipeline is source -> map -> groupByKey -> reduce -> to.
> * The reduce operation uses a tumbling time window 
> TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)).
> * The commit interval is five minutes (30ms).
> * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link 
> to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit 
> interval.
> * The application uses the schema registry for two pairs of serdes. One serde 
> pair is used to read from a source topic that has 40 partitions. The other 
> serde pair is used by the internal changelog and repartition topics created 
> by the groupByKey/reduce operations.
> * The source input rate varies between 500-1500 records/sec. The source rate 
> variation does not change the size or frequency of the leak.
> * The application heap has been configured using both 1024m and 2048m. The 
> only observed difference between the two JVM heap sizes is more old gen 
> collections at 1024m although there is little difference in throughput. JVM 
> settings are {-server -Djava.awt.headless=true -Xss256k 
> -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m 
> -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m 
> -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem 
> -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 
> -XX:MaxMetaspaceFreeRatio=80}
> * We configure a custom RocksDBConfigSetter to set 
> options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors)
> * Per 
> ,
>  the SSTables are being compacted. Total disk usage for the state files 
> (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables.
> * The application is written in Scala and compiled using version 2.12.1.
> • Oracle JVM version "1.8.0_121"
> Various experiments that had no effect on the leak rate:
> * Tried different RocksDB block sizes (4k, 16k, and 32k).
> * Different numbers of instances (1, 2, and 4).
> * Different numbers of threads (1, 4, 10, 40).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5122) Kafka Streams unexpected off-heap memory growth

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5122.
--
Resolution: Not A Problem

> Kafka Streams unexpected off-heap memory growth
> ---
>
> Key: KAFKA-5122
> URL: https://issues.apache.org/jira/browse/KAFKA-5122
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux 64-bit
> Oracle JVM version "1.8.0_121"
>Reporter: Jon Buffington
>Assignee: Guozhang Wang
>Priority: Minor
>
> I have a Kafka Streams application that leaks off-heap memory at a rate of 
> 20MB per commit interval. The application is configured with a 1G heap; the 
> heap memory does not show signs of leaking. The application reaches 16g of 
> system memory usage before terminating and restarting.
> Application facts:
> * The data pipeline is source -> map -> groupByKey -> reduce -> to.
> * The reduce operation uses a tumbling time window 
> TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)).
> * The commit interval is five minutes (30ms).
> * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link 
> to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit 
> interval.
> * The application uses the schema registry for two pairs of serdes. One serde 
> pair is used to read from a source topic that has 40 partitions. The other 
> serde pair is used by the internal changelog and repartition topics created 
> by the groupByKey/reduce operations.
> * The source input rate varies between 500-1500 records/sec. The source rate 
> variation does not change the size or frequency of the leak.
> * The application heap has been configured using both 1024m and 2048m. The 
> only observed difference between the two JVM heap sizes is more old gen 
> collections at 1024m although there is little difference in throughput. JVM 
> settings are {-server -Djava.awt.headless=true -Xss256k 
> -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m 
> -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m 
> -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem 
> -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 
> -XX:MaxMetaspaceFreeRatio=80}
> * We configure a custom RocksDBConfigSetter to set 
> options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors)
> * Per 
> ,
>  the SSTables are being compacted. Total disk usage for the state files 
> (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables.
> * The application is written in Scala and compiled using version 2.12.1.
> • Oracle JVM version "1.8.0_121"
> Various experiments that had no effect on the leak rate:
> * Tried different RocksDB block sizes (4k, 16k, and 32k).
> * Different numbers of instances (1, 2, and 4).
> * Different numbers of threads (1, 4, 10, 40).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] 2.2.0 RC1

2019-03-01 Thread Matthias J. Sax
Thanks for the correction!

-Matthias

On 3/1/19 12:13 PM, Kevin Lu wrote:
> Hi Matthias,
> 
> The Confluence release page is correct, but the “ - Add --under-min-isr
> option to describe topics command” was pushed back to 2.3.0.
> 
> Thanks,
> Kevin
> 
> On Fri, Mar 1, 2019 at 11:48 AM Matthias J. Sax 
> wrote:
> 
>> Hello Kafka users, developers and client-developers,
>>
>> This is the second candidate for release of Apache Kafka 2.2.0.
>>
>>  - Added SSL support for custom principle name
>>  - Allow SASL connections to periodically re-authenticate
>>  - Improved consumer group management
>>- default group.id is `null` instead of empty string
>>  - Add --under-min-isr option to describe topics command
>>  - API improvement
>>- Producer: introduce close(Duration)
>>- AdminClient: introduce close(Duration)
>>- Kafka Streams: new flatTransform() operator in Streams DSL
>>- KafkaStreams (and other classed) now implement AutoClosable to
>> support try-with-resource
>>- New Serdes and default method implementations
>>  - Kafka Streams exposed internal client.id via ThreadMetadata
>>  - Metric improvements:  All `-min`, `-avg` and `-max` metrics will now
>> output `NaN` as default value
>>
>> Release notes for the 2.2.0 release:
>> https://home.apache.org/~mjsax/kafka-2.2.0-rc1/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Thursday, March 7, 9am PST
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> https://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> https://home.apache.org/~mjsax/kafka-2.2.0-rc1/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>>
>> * Javadoc:
>> https://home.apache.org/~mjsax/kafka-2.2.0-rc1/javadoc/
>>
>> * Tag to be voted upon (off 2.2 branch) is the 2.2.0 tag:
>> https://github.com/apache/kafka/releases/tag/2.2.0-rc1
>>
>> * Documentation:
>> https://kafka.apache.org/22/documentation.html
>>
>> * Protocol:
>> https://kafka.apache.org/22/protocol.html
>>
>> * Jenkins builds for the 2.2 branch:
>> Unit/integration tests: https://builds.apache.org/job/kafka-2.2-jdk8/
>>
>> * System tests for the 2.2 branch:
>> https://jenkins.confluent.io/job/system-test-kafka/job/2.2/
>>
>>
>> /**
>>
>> Thanks,
>>
>>
>> -Matthias
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Reopened] (KAFKA-3884) KGroupedStreamIntegrationTest.shouldAggregate seems to be hanging

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3884:


> KGroupedStreamIntegrationTest.shouldAggregate seems to be hanging
> -
>
> Key: KAFKA-3884
> URL: https://issues.apache.org/jira/browse/KAFKA-3884
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Ismael Juma
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: transient-unit-test-failure
>
> Build failed after 180 minutes and the last 2 lines were:
> {code}
> org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > 
> shouldReduce PASSED
> org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > 
> shouldAggregate STARTED
> {code}
> https://builds.apache.org/job/kafka-trunk-jdk8/712/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-7652:
--

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8029) Add in-memory bytes-only session store implementation

2019-03-01 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8029:


 Summary: Add in-memory bytes-only session store implementation
 Key: KAFKA-8029
 URL: https://issues.apache.org/jira/browse/KAFKA-8029
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang
Assignee: Sophie Blee-Goldman


As titled. We've added the window store and session store implementations in 
memory, what's left is the session store now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-431: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2019-03-01 Thread Colin McCabe
+1, looks like a good improvement!

Best,
Colin

On Mon, Feb 18, 2019, at 14:00, Mateusz Zakarczemny wrote:
> Hi all,
> 
> I have created a KIP to support additional message fields in console
> consumer:
> KIP-431 - Support of printing additional ConsumerRecord fields in
> DefaultMessageFormatter
> 
> 
> The main purpose of the proposed change is to allow printing message
> offset, partition and headers in console consumer. Changes are backward
> compatible and impact only console consumer parameters.
> 
> PR: https://github.com/apache/kafka/pull/4807
> Jira ticket: KAFKA-6733 
> 
> I'm waiting for your feedback.
> 
> Regards,
> Mateusz Zakarczemny
> 


Build failed in Jenkins: kafka-2.2-jdk8 #41

2019-03-01 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] HOTFIX: add igore import to streams_upgrade_test

--
[...truncated 2.73 MB...]

kafka.admin.TopicCommandWithAdminClientTest > testCreate PASSED

kafka.admin.TopicCommandWithAdminClientTest > testConfigOptWithBootstrapServers 
STARTED

kafka.admin.TopicCommandWithAdminClientTest > testConfigOptWithBootstrapServers 
PASSED

kafka.admin.TopicCommandWithAdminClientTest > 
testAlterWithUnspecifiedPartitionCount STARTED

kafka.admin.TopicCommandWithAdminClientTest > 
testAlterWithUnspecifiedPartitionCount PASSED

kafka.admin.TopicCommandWithAdminClientTest > testDeleteIfExists STARTED

kafka.admin.TopicCommandWithAdminClientTest > testDeleteIfExists PASSED

kafka.admin.TopicCommandWithAdminClientTest > testDescribe STARTED

kafka.admin.TopicCommandWithAdminClientTest > testDescribe PASSED

kafka.admin.TopicCommandWithAdminClientTest > testAlterAssignment STARTED

kafka.admin.TopicCommandWithAdminClientTest > testAlterAssignment PASSED

kafka.admin.DelegationTokenCommandTest > testDelegationTokenRequests STARTED

kafka.admin.DelegationTokenCommandTest > testDelegationTokenRequests PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeMembersOfNonExistingGroup 
STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeMembersOfNonExistingGroup 
PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeStateOfExistingGroup STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeStateOfExistingGroup PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeExistingGroup STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeExistingGroup PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeStateWithConsumersWithoutAssignedPartitions STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeStateWithConsumersWithoutAssignedPartitions PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeWithMultipleSubActions 
STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeWithMultipleSubActions 
PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeGroupOffsetsWithShortInitializationTimeout STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeGroupOffsetsWithShortInitializationTimeout PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeOffsetsOfExistingGroupWithNoMembers STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeOffsetsOfExistingGroupWithNoMembers PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeGroupMembersWithShortInitializationTimeout STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeGroupMembersWithShortInitializationTimeout PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeMembersOfExistingGroup 
STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeMembersOfExistingGroup 
PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeOffsetsOfExistingGroup 
STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeOffsetsOfExistingGroup 
PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeOffsetsWithConsumersWithoutAssignedPartitions STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeOffsetsWithConsumersWithoutAssignedPartitions PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeWithMultiPartitionTopicAndMultipleConsumers STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeWithMultiPartitionTopicAndMultipleConsumers PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeSimpleConsumerGroup STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeSimpleConsumerGroup PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeStateOfNonExistingGroup 
STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeStateOfNonExistingGroup 
PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeMembersWithConsumersWithoutAssignedPartitions STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeMembersWithConsumersWithoutAssignedPartitions PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeMembersOfExistingGroupWithNoMembers STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeMembersOfExistingGroupWithNoMembers PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeWithUnrecognizedNewConsumerOption STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeWithUnrecognizedNewConsumerOption PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeGroupStateWithShortInitializationTimeout STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeGroupStateWithShortInitializationTimeout PASSED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers STARTED

kafka.admin.DescribeConsumerGroupTest > 
testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers PASSED

kafka.admin.DescribeConsumerGroupTest > testDescribeNonOffsetCommitGroup STARTED

kafka.admin.DescribeConsumerGroupTest > testDescribeNonOffsetCom

Re: [DISCUSS] KIP-426: Persist Broker Id to Zookeeper

2019-03-01 Thread Harsha
Hi,
 Cluster management tools are more generic and they are not aware of Kafka 
specific configs like broker.id.
Even if they are aware of broker.id's , they will be lost when a disk is lost. 
  Irrespective of these use cases, let's look at the problem in isolation.
1. disks are the most common failure case in Kafka clusters 
2. We are storing auto-generated broker.id on disks hence we lose this 
broker.id mapping when disks fail.
3. If we keep the previously generated broker.id mapping along with host on 
zookeeper it's easier to retrieve that mapping on a new host. This would reduce 
the reassignment step and allow us to just copy the data and start the new node 
with the previous broker.id
which is what the KIP is proposing. 
I want to understand what are your concerns in moving this mapping which 
already exists on disk to zookeeper? 

Thanks,
Harsha

On Fri, Mar 1, 2019, at 11:11 AM, Colin McCabe wrote:
> On Wed, Feb 27, 2019, at 14:12, Harsha wrote:
> > Hi Colin,
> >   What we want to is to preserve the broker.id so that we 
> > can do an offline rebuild of a broker. In our cases going through 
> > online Kafka replication to bring up, a failed node will put producer 
> > latencies at risk given the new broker will put all the other leaders 
> > busy with its replication requests. For an offline rebuild, we do not 
> > need to do rebalance as long as we can recover the broker.id
> >   Overall, irrespective of this use case we still want an 
> > ability to retrieve a broker.id for an existing host. This will make 
> > swapping in new hosts with failed hosts by keeping the existing 
> > hostname easier.
> 
> Thanks for the explanation.  Shouldn't this should be handled by the 
> cluster management tool, though?  Kafka doesn't include a mechanism for 
> re-creating nodes that failed.  That's up to kubernetes, or ansible, or 
> whatever cluster provisioning framework you have in place.  This feels 
> like the same kind of thing: managing how the cluster is provisioned.
> 
> best,
> Colin
> 
> > 
> > Thanks,
> > Harsha
> > On Wed, Feb 27, 2019, at 11:53 AM, Colin McCabe wrote:
> > > Hi Li,
> > > 
> > >  > The mechanism simplifies deployment because the same configuration can 
> > > be 
> > >  > used across all brokers, however, in a large system where disk failure 
> > > is 
> > >  > a norm, the meta file could often get lost, causing a new broker id 
> > > being 
> > >  > allocated. This is problematic because new broker id has no partition 
> > >  > assigned to it so it can’t do anything, while partitions assigned to 
> > > the 
> > >  > old one lose one replica
> > > 
> > > If all of the disks have failed, then the partitions will lose their 
> > > replicas no matter what, right?  If any of the disks is still around, 
> > > then there will be a meta file on the disk which contains the previous 
> > > broker ID.  So I'm not sure that we need to change anything here.
> > > 
> > > best,
> > > Colin
> > > 
> > > 
> > > On Tue, Feb 5, 2019, at 14:38, Li Kan wrote:
> > > > Hi, I have KIP-426, which is a small change on automatically determining
> > > > broker id when starting up. I am new to Kafka so there are a bunch of
> > > > design trade-offs that I might be missing or hard to decide, so I'd 
> > > > like to
> > > > get some suggestions on it. I'd expect (and open) to modify (or even
> > > > totally rewrite) the KIP based on suggestions. Thanks.
> > > > 
> > > > -- 
> > > > Best,
> > > > Kan
> > > >
> > >
> >
>


Jenkins build is back to normal : kafka-trunk-jdk8 #3430

2019-03-01 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-2.2-jdk8 #42

2019-03-01 Thread Apache Jenkins Server
See