Re: [DISCUSS] KIP-280: Enhanced log compaction

2018-10-22 Thread Luís Cabral
 Since this is not moving forward, how about I proceed with the currently 
documented changes, and any improvements (such as configuration changes) can be 
taken up afterwards by whoever wants it under a different KIP?
On Thursday, October 11, 2018, 9:47:12 AM GMT+2, Luís Cabral 
 wrote:  
 
  Hi Matthias,

How can this be done?

Kind Regards,
Luis
 
On Sunday, September 30, 2018, 9:10:01 PM GMT+2, Matthias J. Sax 
 wrote:  
 
 Luis,

What is the status of this KIP?

I tend to agree, that introducing the feature only globally, might be
less useful (I would assume that people want to turn it on, on a
per-topic basis). As I am not familiar with the corresponding code, I
cannot judge the complexity to add topic level configs, however, it
seems to be worth to include it in the KIP.


-Matthias



On 9/21/18 1:59 PM, Bertus Greeff wrote:
> Someone pointed out to me that my scenario is also resolved by using Kafka 
> transactions.  Zombie fencing which is essentially my scenario was one of the 
> scenarios that transactions were designed to solve.  I was going to use the 
> ideas of this KIP to solve it but using transactions seems even better 
> because out of order messages never even make it into the topic.  They are 
> blocked by the broker.
> 
> -Original Message-
> From: Guozhang Wang  
> Sent: Saturday, September 1, 2018 11:33 AM
> To: dev 
> Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> 
> Hello Luis,
> 
> Thanks for your thoughtful responses, here are my two cents:
> 
> 1) I think having the new configs with per-topic granularity would not 
> introduce much memory overhead or logic complexity, as all you need is to 
> remember this at the topic metadata cache. If I've missed some details about 
> the complexity, could you elaborate a bit more?
> 
> 2) I agree with you: the current `ConfigDef.Validator` only scope on the 
> validated config value itself, and hence cannot be dependent on another 
> config.
> 
> 4) I think Jun's point is that since we need the latest message in the log 
> segment for the timestamp tracking, we cannot delete it actually: with offset 
> based only policy, this is naturally guaranteed; but now with other policy, 
> it is not guaranteed to never be compacted away, and hence we need to 
> "enforce" to special-handle this case and not delete it.
> 
> 
> 
> Guozhang
> 
> 
> 
> Guozhang
> 
> 
> On Wed, Aug 29, 2018 at 9:25 AM, Luís Cabral 
> wrote:
> 
>> Hi all,
>>
>> Since there has been a rejuvenated interest in this KIP, it felt 
>> better to downgrade it back down to [DISCUSSION], as we aren't really 
>> voting on it anymore.
>> I'll try to address the currently pending questions on the following 
>> points, so please bear with me while we go through them all:
>>
>> 1) Configuration: Topic vs Global
>>
>> Here we all agree that having a configuration per-topic is the best 
>> option. However, this is not possible with the current design of the 
>> compaction solution. Yes, it is true that "some" properties that 
>> relate to compaction are configurable per-topic, but those are only 
>> the properties that act outside(!) of the actual compaction logic, 
>> such as configuring the start-compaction trigger with "ratio" or 
>> compaction duration with "lag.ms ".
>> This logic can, of course, be re-designed to suit our wishes, but this 
>> is not a direct effort, and if we have spent months arguing about the 
>> extra 8 bytes per record, for sure we would spend another few dozen 
>> months discussing the memory impact that doing this change to the 
>> properties will invariably have.
>> As such, I will limit this KIP to ONLY have these configurations globally.
>>
>> 2) Configuration: Fail-fast vs Fallback
>>
>>
>> Ideally, I would also like to prevent the application to start if the 
>> configuration is somehow invalid.
>> However (another 'however'), the way the configuration is handled 
>> prevents adding dependencies between them, so we can't add logic that 
>> says "configuration X is invalid if configuration Y is so-and-such".
>> Again, this can be re-designed to add this feature to the 
>> configuration logic, but it would again be a big change just by 
>> itself, so this KIP is again limited to use ONLY what is already in place.
>>
>> 3) Documenting the memory impact on the KIP
>>
>> This is now added to the KIP, though this topic is more complicated 
>> than 'memory impact'. E.g.: this change doesn't translate to an actual 
>> memory impact, it just means that the compaction will potentially 
>> encompass less records per execution.
>>
>> 4) Documenting how we deal with the last message in the log
>>
>> I have 2 interpretations for this request: "the last message in the log"
>> or "the last message with a shared key on the log"
>> For the former: there is no change to the logic on how the last 
>> message is handled. Only the "tail" gets compacted, so the "head" 
>> (which includes the last message) still keeps the last message
>>
>> 5) Documenti

Re: [VOTE] KIP-331 Add default implementation to close() and configure() for Serializer, Deserializer and Serde

2018-10-22 Thread Damian Guy
Thanks +1 binding

On Sun, 7 Oct 2018 at 03:16, Chia-Ping Tsai  wrote:

> (just bump)
>
> We have following votes:
>
> non-binding
> 1. John Roesler
> 2. vito jeng
> 3. Richard Yu
> 4.  Bill Bejeck
> 5. Satish Duggana
>
> binding
> 1. Matthias J. Sax
> 2. Ismael Juma
>
> it needs one more ticket :)
>
> Cheers,
> Chia-Ping
>
> On 2018/07/05 14:45:01, Chia-Ping Tsai  wrote:
> > hi all,
> >
> > I would like to start voting on "KIP-331 Add default implementation to
> close() and configure() for Serializer, Deserializer and Serde"
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-331+Add+default+implementation+to+close%28%29+and+configure%28%29+for+Serializer%2C+Deserializer+and+Serde
> >
> > Cheers,
> > Chia-Ping
> >
>


Build failed in Jenkins: kafka-2.1-jdk8 #36

2018-10-22 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-7501: Fix producer batch double deallocation when receiving

--
[...truncated 182.80 KB...]
kafka.api.AuthorizerIntegrationTest > 
testIdempotentProducerNoIdempotentWriteAclInProduce PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionOnTopicToReadFromNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionOnTopicToReadFromNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission PASSED

kafka.api.AuthorizerIntegrationTest > 
testSimpleConsumeWithOffsetLookupAndNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > 
testSimpleConsumeWithOffsetLookupAndNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testDeleteTopicsWithWildCardAuth STARTED

kafka.api.AuthorizerIntegrationTest > testDeleteTopicsWithWildCardAuth PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > 
shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend
 STARTED

kafka.api.AuthorizerIntegrationTest > 
shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend
 PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionOnTopicToWriteToNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionOnTopicToWriteToNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testMetadataWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testMetadataWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > testCreatePartitionsWithWildCardAuth 
STARTED

kafka.api.AuthorizerIntegrationTest > testCreatePartitionsWithWildCardAuth 
PASSED

kafka.api.AuthorizerIntegrationTest > 
testUnauthorizedDeleteTopicsWithoutDescribe STARTED

kafka.api.AuthorizerIntegrationTest > 
testUnauthorizedDeleteTopicsWithoutDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testDescribeGroupApiWithGroupDescribe 
STARTED

kafka.api.AuthorizerIntegrationTest > testDescribeGroupApiWithGroupDescribe 
PASSED

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicNotExisting 
STARTED

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicNotExisting 
PASSED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > 
testListGroupApiWithAndWithoutListGroupAcls STARTED

kafka.api.AuthorizerIntegrationTest > 
testListGroupApiWithAndWithoutListGroupAcls PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionMetadataRequestAutoCreate STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionMetadataRequestAutoCreate PASSED

kafka.api.AuthorizerIntegrationTest > testDeleteGroupApiWithNoDeleteGroupAcl2 
STARTED

kafka.api.AuthorizerIntegrationTest > testDeleteGroupApiWithNoDeleteGroupAcl2 
PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionNotMatchingInternalTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionNotMatchingInternalTopic PASSED

kafka.api.AuthorizerIntegrationTest > testFetchAllOffsetsTopicAuthorization 
STARTED

kafka.api.AuthorizerIntegrationTest > testFetchAllOffsetsTopicAuthorization 
PASSED

kafka.api.AuthorizerIntegrationTest > 
shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL STARTED

kafka.api.AuthorizerIntegrationTest > 
shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > 
testOffsetsForLeaderEpochClusterPermission STARTED

kafka.api.AuthorizerIntegrationTest > 
testOffsetsForLeaderEpochClusterPermission PASSED

kafka.api.AuthorizerIntegrationTest > 
testTran

Re: [DISCUSS] KIP-351: Add --under-min-isr option to describe topics command

2018-10-22 Thread Mickael Maison
It might be worth syncing with KIP-377 which is already planning to
make TopicCommand use the AdminClient and add a --bootstrap-server
argument.

Also in the proposed changes section, you mention the challenge of
finding the topic min ISR configuration. Using the
AdminClient.describeConfigs() API,
you directly get the "computed" configuration for topics. If the topic
is using the default config from the broker the configuration source
will be set to "DEFAULT_CONFIG". In case, the configuration was
specified during creation, the source will be set to
"DYNAMIC_TOPIC_CONFIG". So there's no need to query Zookeeper.
On Fri, Oct 19, 2018 at 5:02 PM Kevin Lu  wrote:
>
> Bumping this as I have added some additional details.
>
> This change will require adding a "--bootstrap-server" flag to identify the
> current broker/cluster configured "min.insync.replicas".
>
> Regards,
> Kevin
>
> On Fri, Oct 12, 2018 at 4:19 PM Kevin Lu  wrote:
>
> > Hi All,
> >
> > After some feedback, I have reformulated KIP-351
> > 
> > .
> >
> > This KIP proposes an additional "--under-min-isr" option in TopicCommand
> > to show topic partitions which are under the configured
> > "min.insync.replicas" to help operators identify which topic partitions
> > need immediate fixing.
> >
> > Please take a look and provide some feedback!
> >
> > Thanks!
> >
> > Regards,
> > Kevin
> >


Jenkins build is back to normal : kafka-trunk-jdk8 #3153

2018-10-22 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-7528) Make Min and Max metrics' default value consistent with each other

2018-10-22 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7528:
--

 Summary: Make Min and Max metrics' default value consistent with 
each other
 Key: KAFKA-7528
 URL: https://issues.apache.org/jira/browse/KAFKA-7528
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski
 Fix For: 2.2.0


KIP-386: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[DISCUSS] KIP-386: Make Min metrics' default value consistent with Max metrics

2018-10-22 Thread Stanislav Kozlovski
Hey everybody,

I've opened up a very short KIP to make the Max and Min metrics' default
values consistent with each other.

KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics
JIRA: https://issues.apache.org/jira/browse/KAFKA-7528

This is hopefully a very straightforward change. Please provide feedback.
Thanks

-- 
Best,
Stanislav


Build failed in Jenkins: kafka-2.1-jdk8 #37

2018-10-22 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-7519 Clear pending transaction state when expiration fails 
(#5820)

--
[...truncated 427.96 KB...]

kafka.admin.PreferredReplicaElectionCommandTest > 
testBasicPreferredReplicaElection STARTED

kafka.admin.PreferredReplicaElectionCommandTest > 
testBasicPreferredReplicaElection PASSED

kafka.admin.PreferredReplicaElectionCommandTest > testPreferredReplicaJsonData 
STARTED

kafka.admin.PreferredReplicaElectionCommandTest > testPreferredReplicaJsonData 
PASSED

kafka.admin.DelegationTokenCommandTest > testDelegationTokenRequests STARTED

kafka.admin.DelegationTokenCommandTest > testDelegationTokenRequests PASSED

kafka.admin.BrokerApiVersionsCommandTest > checkBrokerApiVersionCommandOutput 
STARTED

kafka.admin.BrokerApiVersionsCommandTest > checkBrokerApiVersionCommandOutput 
PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicWithCleaner STARTED

kafka.admin.DeleteTopicTest > testDeleteTopicWithCleaner PASSED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicOnControllerFailover STARTED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicOnControllerFailover PASSED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicWithRecoveredFollower STARTED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicWithRecoveredFollower PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicAlreadyMarkedAsDeleted STARTED

kafka.admin.DeleteTopicTest > testDeleteTopicAlreadyMarkedAsDeleted PASSED

kafka.admin.DeleteTopicTest > testIncreasePartitionCountDuringDeleteTopic 
STARTED

kafka.admin.DeleteTopicTest > testIncreasePartitionCountDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testPartitionReassignmentDuringDeleteTopic STARTED

kafka.admin.DeleteTopicTest > testPartitionReassignmentDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testDeleteNonExistingTopic STARTED

kafka.admin.DeleteTopicTest > testDeleteNonExistingTopic PASSED

kafka.admin.DeleteTopicTest > testRecreateTopicAfterDeletion STARTED

kafka.admin.DeleteTopicTest > testRecreateTopicAfterDeletion PASSED

kafka.admin.DeleteTopicTest > testDisableDeleteTopic STARTED

kafka.admin.DeleteTopicTest > testDisableDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic STARTED

kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicWithAllAliveReplicas STARTED

kafka.admin.DeleteTopicTest > testDeleteTopicWithAllAliveReplicas PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicDuringAddPartition STARTED

kafka.admin.DeleteTopicTest > testDeleteTopicDuringAddPartition PASSED

kafka.admin.DeleteTopicTest > testDeletingPartiallyDeletedTopic STARTED

kafka.admin.DeleteTopicTest > testDeletingPartiallyDeletedTopic PASSED

kafka.admin.AdminTest > testGetBrokerMetadatas STARTED

kafka.admin.AdminTest > testGetBrokerMetadatas PASSED

kafka.admin.AdminTest > testBootstrapClientIdConfig STARTED

kafka.admin.AdminTest > testBootstrapClientIdConfig PASSED

kafka.admin.AdminTest > testManualReplicaAssignment STARTED

kafka.admin.AdminTest > testManualReplicaAssignment PASSED

kafka.admin.AdminTest > testConcurrentTopicCreation STARTED

kafka.admin.AdminTest > testConcurrentTopicCreation PASSED

kafka.admin.AdminTest > testTopicCreationWithCollision STARTED

kafka.admin.AdminTest > testTopicCreationWithCollision PASSED

kafka.admin.AdminTest > testTopicCreationInZK STARTED

kafka.admin.AdminTest > testTopicCreationInZK PASSED

kafka.admin.AclCommandTest > testAclCliWithAuthorizer STARTED

kafka.admin.AclCommandTest > testAclCliWithAuthorizer PASSED

kafka.admin.AclCommandTest > testInvalidAuthorizerProperty STARTED

kafka.admin.AclCommandTest > testInvalidAuthorizerProperty PASSED

kafka.admin.AclCommandTest > testAclsOnPrefixedResourcesWithAdminAPI STARTED

kafka.admin.AclCommandTest > testAclsOnPrefixedResourcesWithAdminAPI PASSED

kafka.admin.AclCommandTest > testPatternTypes STARTED

kafka.admin.AclCommandTest > testPatternTypes PASSED

kafka.admin.AclCommandTest > testProducerConsumerCliWithAdminAPI STARTED

kafka.admin.AclCommandTest > testProducerConsumerCliWithAdminAPI PASSED

kafka.admin.AclCommandTest > testAclsOnPrefixedResourcesWithAuthorizer STARTED

kafka.admin.AclCommandTest > testAclsOnPrefixedResourcesWithAuthorizer PASSED

kafka.admin.AclCommandTest > testProducerConsumerCliWithAuthorizer STARTED

kafka.admin.AclCommandTest > testProducerConsumerCliWithAuthorizer PASSED

kafka.admin.AclCommandTest > testAclCliWithAdminAPI STARTED

kafka.admin.AclCommandTest > testAclCliWithAdminAPI PASSED

kafka.admin.ConfigCommandTest > testScramCredentials STARTED

kafka.admin.ConfigCommandTest > testScramCredentials PASSED

kafka.admin.ConfigCommandTest > shouldParseArgumentsForTopicsEntityType STARTED

kafka.admin.ConfigCommandTest > shouldParseArgumentsForTopicsEntityType PASSED

kafka.admin.ConfigCommandTest > shouldAddBr

Build failed in Jenkins: kafka-trunk-jdk8 #3154

2018-10-22 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-7519 Clear pending transaction state when expiration fails 
(#5820)

--
[...truncated 2.83 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafk

request wiki permissions

2018-10-22 Thread Pellerin, Clement
Can you give wiki permissions to wiki ID cpellerin?
I was able to create and edit KIP-383 last week but it no longer works for me.



Re: request wiki permissions

2018-10-22 Thread Jun Rao
Clement,

Thanks for your interest. Just gave you the permission.

Jun

On Mon, Oct 22, 2018 at 10:00 AM, Pellerin, Clement <
clement_pelle...@ibi.com> wrote:

> Can you give wiki permissions to wiki ID cpellerin?
> I was able to create and edit KIP-383 last week but it no longer works for
> me.
>
>


KAFKA-3932 - Consumer fails to consume in a round robin fashion

2018-10-22 Thread ChienHsing Wu
Hi,



I encountered the issue documented in the jira 
KAFKA-3932.
 Upon studying the source code and the 
PIP,
 I think the issues is the statement in PIP: "As before, we'd keep track of 
which partition we left off at so that the next iteration would begin there." I 
think it should NOT use the last partition in the next iteration; it should 
pick the next one instead.

If this behavior is agreeable, the simplest solution to impose the order to 
pick the next one is to use the order the consumer.internals.Fetcher receives 
the partition messages, as determined by completedFetches queue in that class. 
To avoid parsing the partition messages repeatedly. we can save those parsed 
fetches to a list and maintain the next partition to get messages there.

Does it sound like a good approach? If this is not the right place to discuss 
the design please let me know where to engage. If this is agreeable I can 
contribute the implementation.



Thanks, CH



Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

2018-10-22 Thread Jun Rao
Hi, Patrick,

There is another thing that may be worth considering.

10. It will be useful to include the czxid also in the ControlledShutdown
request. This way, if the broker has been restarted, the controller can
ignore an old ControlledShutdown request(e.g., due to retries). This will
prevent the restarted broker from incorrectly stopping replicas.

Thanks,

Jun


On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang 
wrote:

> Hi Jun,
>
> Thanks a lot for the comments.
>
> 1. czxid is globally unique and monotonically increasing based on the
> zookeeper doc.
> References (from
> https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> "Every change to the ZooKeeper state receives a stamp in the form of a
> *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
> changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
> smaller than zxid2 then zxid1 happened before zxid2."
> "czxid: The zxid of the change that caused this znode to be created."
>
> 2. You are right. There will be only on broker change event fired in the
> case I mentioned because we will not register the watcher before the read.
>
> 3. Let's say we want to initialize the states of broker set A and we want
> the cluster to be aware of the absence of broker set B. The currently live
> broker set in the cluster is C.
>
> From the design point of view, here are the rules for broker state
> transition:
> - Pass in broker ids of A for onBrokerStartup() and pass in broker ids
> of B for onBrokerFailure().
> - When processing onBrokerStartup(), we use the broker generation
> controller read from zk to send requests to broker set A and use the
> previously cached broker generation to send requests to (C-A).
> - When processing onBrokerFailure(), we use the previously cached
> broker generation to send requests to C.
>
> From the implementation point of view, here are the steps we need to
> follow when processing BrokerChangeEvent:
> -  Reads all child nodes in /brokers/ids/ to get current brokers with
> broker generation
> -  Detect new brokers, dead brokers and bounced brokers
> -  Update the live broker ids in controller context
> -  Update broker generations for new brokers in controller context
> -  Invoke onBrokerStartup(new brokers)
> -  Invoke onBrokerFailure(bounced brokers)
> -  Update broker generations for bounce brokers in controller context
> -  Invoke onBrokerStartup(bounced brokers)
> -  Invoke onBrokerFailure(dead brokers)
> We can further optimize the flow by avoiding sending requests to a
> broker if its broker generation is larger than the one in the controller
> context.
>
> I will also update the KIP to clarify how it works for BrokerChangeEvent
> processing in more detail.
>
> Thanks,
> Patrick
>
>
>
> On Thu, Oct 11, 2018 at 12:12 PM Jun Rao  wrote:
>
> > Hi, Patrick,
> >
> > Thanks for the KIP. Looks good to me overall and very useful. A few
> > comments below.
> >
> > 1. "will reject the requests with smaller broker generation than its
> > current generation." Is czxid monotonically increasing?
> >
> > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > watchers are one-time watchers. Once a watcher is fired, one needs to
> > register it again before the watcher can be triggered. So, when the
> > controller is busy and a broker goes down and comes up, the first event
> > will trigger the ZK watcher. Since the controller is busy and hasn't
> > registered the watcher again, the second event actually won't fire. By
> the
> > time the controller reads from ZK, it sees that the broker is still
> > registered and thus thinks that nothing has happened to the broker, which
> > is causing the problem.
> >
> > 3. "Handle broker state change: invoke onBrokerFailure(...) first, then
> > invoke onBrokerStartUp(...)". We probably want to be a bit careful here.
> > Could you clarify the broker list and the broker epoch used when making
> > these calls? We want to prevent the restarted broker from receiving a
> > partial replica list on the first LeaderAndIsr request because of this.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang 
> > wrote:
> >
> > > Hey Stanislav,
> > >
> > > Sure. Thanks for your interest in this KIP. I am glad to provide more
> > > detail.
> > >
> > > broker A is initiating a controlled shutdown (restart). The Controller
> > > sends a StopReplicaRequest but it reaches broker A after it has started
> > up
> > > again. He therefore stops replicating those partitions even though he
> > > should just be starting to
> > > This is right.
> > >
> > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > restart.
> > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> therefore
> > > starts leading for the partitions sent by that request and might stop
> > > leading partitions that it was leading previously.
> > > This was well expla

[jira] [Created] (KAFKA-7529) Kafka Connect JDBC doesn't push new records to Kafka Topic unless the connector is restarted

2018-10-22 Thread Kashyap Ivaturi (JIRA)
Kashyap Ivaturi created KAFKA-7529:
--

 Summary: Kafka Connect JDBC doesn't push new records to Kafka 
Topic unless the connector is restarted
 Key: KAFKA-7529
 URL: https://issues.apache.org/jira/browse/KAFKA-7529
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Kashyap Ivaturi


Hi,

We have a Kafka Connect JDBC Source Connector which keeps polling for new 
records in a Oracle table every minute and push the new records to Kafka Topic. 
New records are determined by an incrementing column.

In general everything works well but once in a while we see that even though 
there were new records with incrementing column those records doesn't get 
pushed to the Topic. There is no clue of any error in the logs and the 
connector is in running state. Only after we restart the Connector the new 
records are pushed to the Topic. 

Any idea in what situation can this happen?. 

Rgds
Kashyap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-386: Make Min metrics' default value consistent with Max metrics

2018-10-22 Thread Suman B N
Looks good to me. Maintains uniformity. +1 from me.
But its more of a bug rather than improvement proposal. Let's see what
contributors got to say.

On Mon, Oct 22, 2018 at 7:23 PM Stanislav Kozlovski 
wrote:

> Hey everybody,
>
> I've opened up a very short KIP to make the Max and Min metrics' default
> values consistent with each other.
>
> KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics
> JIRA: https://issues.apache.org/jira/browse/KAFKA-7528
>
> This is hopefully a very straightforward change. Please provide feedback.
> Thanks
>
> --
> Best,
> Stanislav
>


-- 
*Suman*
*OlaCabs*


Re: [DISCUSS] KIP-386: Make Min metrics' default value consistent with Max metrics

2018-10-22 Thread Stanislav Kozlovski
Hi Suman,

Thanks for taking a look at this. Yeah, it's very minor but it changes the
public API (even if this is a very slight change) and as far as I know this
warrants some discussion

On Mon, Oct 22, 2018 at 9:28 PM Suman B N  wrote:

> Looks good to me. Maintains uniformity. +1 from me.
> But its more of a bug rather than improvement proposal. Let's see what
> contributors got to say.
>
> On Mon, Oct 22, 2018 at 7:23 PM Stanislav Kozlovski <
> stanis...@confluent.io>
> wrote:
>
> > Hey everybody,
> >
> > I've opened up a very short KIP to make the Max and Min metrics' default
> > values consistent with each other.
> >
> > KIP:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics
> > JIRA: https://issues.apache.org/jira/browse/KAFKA-7528
> >
> > This is hopefully a very straightforward change. Please provide feedback.
> > Thanks
> >
> > --
> > Best,
> > Stanislav
> >
>
>
> --
> *Suman*
> *OlaCabs*
>


-- 
Best,
Stanislav


RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

2018-10-22 Thread Pellerin, Clement
I updated the KIP. The proposed interface is now named SslFactory and the 
default implementation is DefaultSslFactory.
I also mention the existing non-default constructors will be removed.
The constants for the new config keys are now declared in the interface itself.
Please give me your feedback on this new version.

-Original Message-
From: Harsha Chintalapani [mailto:ka...@harsha.io] 
Sent: Friday, October 19, 2018 6:51 PM
To: dev@kafka.apache.org; dev@kafka.apache.org
Subject: RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

SslFactory is not a public interface for others to use.  EchoServer is internal 
testing.
We should make these as proposed in rejected alternatives to SslFactory and 
DefaultSslFactory.
I don’t see any one using a internal class as public API.

-Harsha


RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

2018-10-22 Thread Harsha Chintalapani
Thanks for the changes. KIP looks good to me.

-Harsha
On Oct 22, 2018, 12:57 PM -0700, Pellerin, Clement , 
wrote:
> I updated the KIP. The proposed interface is now named SslFactory and the 
> default implementation is DefaultSslFactory.
> I also mention the existing non-default constructors will be removed.
> The constants for the new config keys are now declared in the interface 
> itself.
> Please give me your feedback on this new version.
>
> -Original Message-
> From: Harsha Chintalapani [mailto:ka...@harsha.io]
> Sent: Friday, October 19, 2018 6:51 PM
> To: dev@kafka.apache.org; dev@kafka.apache.org
> Subject: RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory
>
> SslFactory is not a public interface for others to use.  EchoServer is 
> internal testing.
> We should make these as proposed in rejected alternatives to SslFactory and 
> DefaultSslFactory.
> I don’t see any one using a internal class as public API.
>
> -Harsha


kafka design patterns

2018-10-22 Thread abeceda4

Hi can you help me identify design patterns in the code apache/kafka ?

Thanks
 
Mrkvica



[jira] [Created] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Sophie Qian (JIRA)
Sophie Qian created KAFKA-7530:
--

 Summary: Need to allow overwrite 
ssl.endpoint.identification.algorithm.config
 Key: KAFKA-7530
 URL: https://issues.apache.org/jira/browse/KAFKA-7530
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.0
Reporter: Sophie Qian


We are in the process of upgrading our system to use Confluent 5.0.0 (which is 
using Kafka 2.0.0). I found out SslConfigs ( 
clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
following change:

 

{color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
(KIP-294) (#4956){color}

{color:#14892c}Make HTTPS the default 
ssl.endpoint.identification.algorithm.{color}

 

But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
following values can be reconfigurable.

{color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
Utils.mkSet(
¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}

 

Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
reconfigurable. 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

2018-10-22 Thread Patrick Huang
Hi Jun,

That is a good point. I want to make it clear about the scenario you mentioned. 
Correct me if I am wrong. Here is the sequence of the event:

  1.  Broker sends ControlledShutdown request 1 to controller
  2.  Broker sends ControlledShutdown request 2 to controller due to reties
  3.  Controller processes ControlledShutdown request 1
  4.  Controller sends control requests to the broker
  5.  Broker receives ControlledShutdown response 1 from controller
  6.  Broker shuts down and restarts quickly
  7.  Controller processes ControllerShutdown request 2
  8.  Controller sends control requests to the broker

It is possible that controller processes the broker change event between 6) and 
7). In this case, controller already updates the cached czxid to the up-to-date 
ones so the bounced broker will not reject control requests in 8), which cause 
a correctness problem.


Best,
Zhanxiang (Patrick) Huang


From: Jun Rao 
Sent: Monday, October 22, 2018 14:45
To: dev
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced 
brokers using broker generation

Hi, Patrick,

There is another thing that may be worth considering.

10. It will be useful to include the czxid also in the ControlledShutdown
request. This way, if the broker has been restarted, the controller can
ignore an old ControlledShutdown request(e.g., due to retries). This will
prevent the restarted broker from incorrectly stopping replicas.

Thanks,

Jun


On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang 
wrote:

> Hi Jun,
>
> Thanks a lot for the comments.
>
> 1. czxid is globally unique and monotonically increasing based on the
> zookeeper doc.
> References (from
> https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> "Every change to the ZooKeeper state receives a stamp in the form of a
> *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
> changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
> smaller than zxid2 then zxid1 happened before zxid2."
> "czxid: The zxid of the change that caused this znode to be created."
>
> 2. You are right. There will be only on broker change event fired in the
> case I mentioned because we will not register the watcher before the read.
>
> 3. Let's say we want to initialize the states of broker set A and we want
> the cluster to be aware of the absence of broker set B. The currently live
> broker set in the cluster is C.
>
> From the design point of view, here are the rules for broker state
> transition:
> - Pass in broker ids of A for onBrokerStartup() and pass in broker ids
> of B for onBrokerFailure().
> - When processing onBrokerStartup(), we use the broker generation
> controller read from zk to send requests to broker set A and use the
> previously cached broker generation to send requests to (C-A).
> - When processing onBrokerFailure(), we use the previously cached
> broker generation to send requests to C.
>
> From the implementation point of view, here are the steps we need to
> follow when processing BrokerChangeEvent:
> -  Reads all child nodes in /brokers/ids/ to get current brokers with
> broker generation
> -  Detect new brokers, dead brokers and bounced brokers
> -  Update the live broker ids in controller context
> -  Update broker generations for new brokers in controller context
> -  Invoke onBrokerStartup(new brokers)
> -  Invoke onBrokerFailure(bounced brokers)
> -  Update broker generations for bounce brokers in controller context
> -  Invoke onBrokerStartup(bounced brokers)
> -  Invoke onBrokerFailure(dead brokers)
> We can further optimize the flow by avoiding sending requests to a
> broker if its broker generation is larger than the one in the controller
> context.
>
> I will also update the KIP to clarify how it works for BrokerChangeEvent
> processing in more detail.
>
> Thanks,
> Patrick
>
>
>
> On Thu, Oct 11, 2018 at 12:12 PM Jun Rao  wrote:
>
> > Hi, Patrick,
> >
> > Thanks for the KIP. Looks good to me overall and very useful. A few
> > comments below.
> >
> > 1. "will reject the requests with smaller broker generation than its
> > current generation." Is czxid monotonically increasing?
> >
> > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > watchers are one-time watchers. Once a watcher is fired, one needs to
> > register it again before the watcher can be triggered. So, when the
> > controller is busy and a broker goes down and comes up, the first event
> > will trigger the ZK watcher. Since the controller is busy and hasn't
> > registered the watcher again, the second event actually won't fire. By
> the
> > time the controller reads from ZK, it sees that the broker is still
> > registered and thus thinks that nothing has happened to the broker, which
> > is causing the problem.
> >
> > 3. "Handle broker state change: invoke onBrokerFailure(...) first, then
> > invoke onBrokerSta

RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

2018-10-22 Thread Pellerin, Clement
If it is up to me, I'd say the KIP is now finalized. Can I call for a [VOTE] or 
I need more feedback?

-Original Message-
From: Harsha Chintalapani [mailto:ka...@harsha.io] 
Sent: Monday, October 22, 2018 4:36 PM
To: dev@kafka.apache.org
Subject: RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

Thanks for the changes. KIP looks good to me.

-Harsha
On Oct 22, 2018, 12:57 PM -0700, Pellerin, Clement , 
wrote:
> I updated the KIP. The proposed interface is now named SslFactory and the 
> default implementation is DefaultSslFactory.
> I also mention the existing non-default constructors will be removed.
> The constants for the new config keys are now declared in the interface 
> itself.
> Please give me your feedback on this new version.
>
> -Original Message-
> From: Harsha Chintalapani [mailto:ka...@harsha.io]
> Sent: Friday, October 19, 2018 6:51 PM
> To: dev@kafka.apache.org; dev@kafka.apache.org
> Subject: RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory
>
> SslFactory is not a public interface for others to use.  EchoServer is 
> internal testing.
> We should make these as proposed in rejected alternatives to SslFactory and 
> DefaultSslFactory.
> I don’t see any one using a internal class as public API.
>
> -Harsha


Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

2018-10-22 Thread Jun Rao
Hi, Patrick,

Yes, that's the general sequence. After step 2, the shutting down broker
can give up the controlled shutdown process and proceed to shut down. When
it's restarted, it could still receive StopReplica requests from the
controller in reaction to the previous controlled shutdown requests. This
could lead the restarted broker to a bad state.

Thanks,

Jun


On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang  wrote:

> Hi Jun,
>
> That is a good point. I want to make it clear about the scenario you
> mentioned. Correct me if I am wrong. Here is the sequence of the event:
>
>1. Broker sends ControlledShutdown request 1 to controller
>2. Broker sends ControlledShutdown request 2 to controller due to
>reties
>3. Controller processes ControlledShutdown request 1
>4. Controller sends control requests to the broker
>5. Broker receives ControlledShutdown response 1 from controller
>6. Broker shuts down and restarts quickly
>7. Controller processes ControllerShutdown request 2
>8. Controller sends control requests to the broker
>
> It is possible that controller processes the broker change event between
> 6) and 7). In this case, controller already updates the cached czxid to the
> up-to-date ones so the bounced broker will not reject control requests in
> 8), which cause a correctness problem.
>
>
> Best,
> Zhanxiang (Patrick) Huang
>
> --
> *From:* Jun Rao 
> *Sent:* Monday, October 22, 2018 14:45
> *To:* dev
> *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi, Patrick,
>
> There is another thing that may be worth considering.
>
> 10. It will be useful to include the czxid also in the ControlledShutdown
> request. This way, if the broker has been restarted, the controller can
> ignore an old ControlledShutdown request(e.g., due to retries). This will
> prevent the restarted broker from incorrectly stopping replicas.
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang 
> wrote:
>
> > Hi Jun,
> >
> > Thanks a lot for the comments.
> >
> > 1. czxid is globally unique and monotonically increasing based on the
> > zookeeper doc.
> > References (from
> > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> > "Every change to the ZooKeeper state receives a stamp in the form of a
> > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
> > changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
> > smaller than zxid2 then zxid1 happened before zxid2."
> > "czxid: The zxid of the change that caused this znode to be created."
> >
> > 2. You are right. There will be only on broker change event fired in the
> > case I mentioned because we will not register the watcher before the
> read.
> >
> > 3. Let's say we want to initialize the states of broker set A and we want
> > the cluster to be aware of the absence of broker set B. The currently
> live
> > broker set in the cluster is C.
> >
> > From the design point of view, here are the rules for broker state
> > transition:
> > - Pass in broker ids of A for onBrokerStartup() and pass in broker
> ids
> > of B for onBrokerFailure().
> > - When processing onBrokerStartup(), we use the broker generation
> > controller read from zk to send requests to broker set A and use the
> > previously cached broker generation to send requests to (C-A).
> > - When processing onBrokerFailure(), we use the previously cached
> > broker generation to send requests to C.
> >
> > From the implementation point of view, here are the steps we need to
> > follow when processing BrokerChangeEvent:
> > -  Reads all child nodes in /brokers/ids/ to get current brokers with
> > broker generation
> > -  Detect new brokers, dead brokers and bounced brokers
> > -  Update the live broker ids in controller context
> > -  Update broker generations for new brokers in controller context
> > -  Invoke onBrokerStartup(new brokers)
> > -  Invoke onBrokerFailure(bounced brokers)
> > -  Update broker generations for bounce brokers in controller context
> > -  Invoke onBrokerStartup(bounced brokers)
> > -  Invoke onBrokerFailure(dead brokers)
> > We can further optimize the flow by avoiding sending requests to a
> > broker if its broker generation is larger than the one in the controller
> > context.
> >
> > I will also update the KIP to clarify how it works for BrokerChangeEvent
> > processing in more detail.
> >
> > Thanks,
> > Patrick
> >
> >
> >
> > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao  wrote:
> >
> > > Hi, Patrick,
> > >
> > > Thanks for the KIP. Looks good to me overall and very useful. A few
> > > comments below.
> > >
> > > 1. "will reject the requests with smaller broker generation than its
> > > current generation." Is czxid monotonically increasing?
> > >
> > > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > > w

Re: [DISCUSS] KIP-351: Add --under-min-isr option to describe topics command

2018-10-22 Thread Kevin Lu
Hi Mickael,

Thanks for the suggestion for getting the "computed" configuration from
AdminClient.describeConfigs(). That's exactly what I was looking for!

I have updated the KIP to use AdminClient.describeConfigs(), and included a
code snippet. Please take a look.

Since KIP-377 proposes the same bootstrap-server option, I've put a
dependency for this KIP to have KIP-377 implemented first so we don't
duplicate work or introduce conflicts. I'll give the other thread a bump,
but I think we can still continue discussion/voting for this KIP.

Thoughts?

Regards,
Kevin

On Mon, Oct 22, 2018 at 4:01 AM Mickael Maison 
wrote:

> It might be worth syncing with KIP-377 which is already planning to
> make TopicCommand use the AdminClient and add a --bootstrap-server
> argument.
>
> Also in the proposed changes section, you mention the challenge of
> finding the topic min ISR configuration. Using the
> AdminClient.describeConfigs() API,
> you directly get the "computed" configuration for topics. If the topic
> is using the default config from the broker the configuration source
> will be set to "DEFAULT_CONFIG". In case, the configuration was
> specified during creation, the source will be set to
> "DYNAMIC_TOPIC_CONFIG". So there's no need to query Zookeeper.
> On Fri, Oct 19, 2018 at 5:02 PM Kevin Lu  wrote:
> >
> > Bumping this as I have added some additional details.
> >
> > This change will require adding a "--bootstrap-server" flag to identify
> the
> > current broker/cluster configured "min.insync.replicas".
> >
> > Regards,
> > Kevin
> >
> > On Fri, Oct 12, 2018 at 4:19 PM Kevin Lu  wrote:
> >
> > > Hi All,
> > >
> > > After some feedback, I have reformulated KIP-351
> > > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-351%3A+Add+--under-min-isr+option+to+describe+topics+command
> >
> > > .
> > >
> > > This KIP proposes an additional "--under-min-isr" option in
> TopicCommand
> > > to show topic partitions which are under the configured
> > > "min.insync.replicas" to help operators identify which topic partitions
> > > need immediate fixing.
> > >
> > > Please take a look and provide some feedback!
> > >
> > > Thanks!
> > >
> > > Regards,
> > > Kevin
> > >
>


Re: [DISCUSS] KIP-377: TopicCommand to use AdminClient

2018-10-22 Thread Kevin Lu
Hi Viktor,

+1 to this KIP.

I would very much like to see AdminClient in TopicCommand. This would also
allow us to efficiently implement new features like the "--under-min-isr"
option I proposed in KIP-351

.

Thanks.

Regards,
Kevin

On Sat, Oct 20, 2018 at 10:52 PM Colin McCabe  wrote:

> Hi Viktor,
>
> Sounds good.  If you want to propose a way of improving the metadata
> protocol so that "[deleted]" could be supported, you could probably create
> that KIP in parallel.
>
> The last KIP in that area that I can remember is KIP-142, which didn't get
> adopted (yet?)
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-142%3A+Add+ListTopicsRequest+to+efficiently+list+all+the+topics+in+a+cluster
>
> There have been other discussions though.  In general there are a lot of
> features that would be nice to have in the metadata protocol (pagniation,
> regexes, skip stuff we don't need).
>
> best,
> Colin
>
>
> On Tue, Oct 16, 2018, at 10:11, Viktor Somogyi-Vass wrote:
> > Hi Colin,
> >
> > Thanks, it makes sense and simplifies this KIP tremendously. I'll move
> this
> > section to the rejected alternatives with a note that KIP-142 will have
> > this feature.
> > On a similar note: is there a KIP for describe topics protocol or have
> you
> > been thinking about it? I guess there it's the same problem, we often
> don't
> > want to forward the entire metadata.
> >
> > Viktor
> >
> > On Fri, Oct 12, 2018 at 12:03 PM Colin McCabe 
> wrote:
> >
> > > Hi Viktor,
> > >
> > > Thanks for bumping this thread.
> > >
> > > I think we should just focus on transitioning the TopicCommand to use
> > > AdminClient, and talk about protocol changes in a separate KIP.
> Protocol
> > > changes often involve a lot of discussion.  This does mean that we
> couldn't
> > > implement the "list topics under deletion" feature when using
> AdminClient
> > > at the moment.  We could add a note to the tool output indicating this.
> > >
> > > We should move the protocol discussion to a separate thread.  Probably
> > > also look at KIP-142 as well.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Oct 9, 2018, at 07:45, Viktor Somogyi-Vass wrote:
> > > > Hi All,
> > > >
> > > > Would like to bump this as the conversation sank a little bit, but
> more
> > > > importantly I'd like to validate my plans/ideas on extending the
> Metadata
> > > > protocol. I was thinking about two other alternatives, namely:
> > > > 1. Create a ListTopicUnderDeletion protocol. This however would be
> > > > unnecessary: it'd have one very narrow functionality which we can't
> > > extend.
> > > > I'd make sense to have a list topics or describe topics protocol
> where we
> > > > can list/describe topics under deletion but for normal
> listing/describing
> > > > we already use the metadata, so it would be a duplication of
> > > functionality.
> > > > 2. DeleteTopicsResponse could return the topics under deletion if the
> > > > request's argument list is empty which might make sense at the first
> > > look,
> > > > but actually we'd mix the query functionality with the delete
> > > functionality
> > > > which is counterintuitive.
> > > >
> > > > Even though most clients won't need these "limbo" topics (which are
> under
> > > > deletion) in the foreseeable future, it can be considered as part of
> the
> > > > cluster state or metadata and to me it makes sense. Also it doesn't
> have
> > > a
> > > > big overhead in the response size as typically users don't delete
> topics
> > > > too often as far as I experienced.
> > > >
> > > > I'd be happy to receive some ideas/feedback on this.
> > > >
> > > > Cheers,
> > > > Viktor
> > > >
> > > >
> > > > On Fri, Sep 28, 2018 at 4:51 PM Viktor Somogyi-Vass <
> > > viktorsomo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I made an update to the KIP. Just in short:
> > > > > Currently KafkaAdminClient.describeTopics() and
> > > > > KafkaAdminClient.listTopics() uses the Metadata protocol to acquire
> > > topic
> > > > > information. The returned response however won't contain the topics
> > > that
> > > > > are under deletion but couldn't complete yet (for instance because
> of
> > > some
> > > > > replicas offline), therefore it is not possible to implement the
> > > current
> > > > > command's "marked for deletion" feature. To get around this I
> > > introduced
> > > > > some changes in the Metadata protocol.
> > > > >
> > > > > Thanks,
> > > > > Viktor
> > > > >
> > > > > On Fri, Sep 28, 2018 at 4:48 PM Viktor Somogyi-Vass <
> > > > > viktorsomo...@gmail.com> wrote:
> > > > >
> > > > >> Hi Mickael,
> > > > >>
> > > > >> Thanks for the feedback, I also think that many customers wanted
> this
> > > for
> > > > >> a long time.
> > > > >>
> > > > >> Cheers,
> > > > >> Viktor
> > > > >>
> > > > >> On Fri, Sep 28, 2018 at 11:45 AM Mickael Maison <
> > > mickael.mai...@gmail.com>
> >

Re: [DISCUSS] KIP-386: Make Min metrics' default value consistent with Max metrics

2018-10-22 Thread John Roesler
Hi Stanislav,
Thanks for this KIP. I coincidentally just noticed these strange initial
values while doing some performance measurements.

Since the metric type is a double, could we consider NaN instead? It seems
like +Inf is somewhat arbitrary for Max, so it might as well be an
arbitrary value that actually means "this is not a number".

Consider that +- Infinity is technically "in range" for max and min, while
NaN is not. NaN is "in range" for an average or a rate, but in those cases
it would mean that the result is over 0 samples or 0 time, respectively. I
think this only happens when nothing has been recorded, so it would still
be sound for the situation you're attempting to address.

Just to throw it out there, `null` is technically also (maybe moreso) a
sound choice, but I'd be concerned about causing a bunch of null
dereference errors.

Thanks again,
-John

On Mon, Oct 22, 2018 at 2:27 PM Stanislav Kozlovski 
wrote:

> Hi Suman,
>
> Thanks for taking a look at this. Yeah, it's very minor but it changes the
> public API (even if this is a very slight change) and as far as I know this
> warrants some discussion
>
> On Mon, Oct 22, 2018 at 9:28 PM Suman B N  wrote:
>
> > Looks good to me. Maintains uniformity. +1 from me.
> > But its more of a bug rather than improvement proposal. Let's see what
> > contributors got to say.
> >
> > On Mon, Oct 22, 2018 at 7:23 PM Stanislav Kozlovski <
> > stanis...@confluent.io>
> > wrote:
> >
> > > Hey everybody,
> > >
> > > I've opened up a very short KIP to make the Max and Min metrics'
> default
> > > values consistent with each other.
> > >
> > > KIP:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics
> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-7528
> > >
> > > This is hopefully a very straightforward change. Please provide
> feedback.
> > > Thanks
> > >
> > > --
> > > Best,
> > > Stanislav
> > >
> >
> >
> > --
> > *Suman*
> > *OlaCabs*
> >
>
>
> --
> Best,
> Stanislav
>


Build failed in Jenkins: kafka-1.1-jdk7 #223

2018-10-22 Thread Apache Jenkins Server
See 


Changes:

[junrao] KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515)

--
[...truncated 425.68 KB...]
kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToFirstOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldEnforceOffsetsIncreaseMonotonically STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldEnforceOffsetsIncreaseMonotonically PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldEnforceMonotonicallyIncreasingStartOffsets STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldEnforceMonotonicallyIncreasingStartOffsets PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica 
STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldNotAllowDivergentLogs STARTED

k

Build failed in Jenkins: kafka-trunk-jdk8 #3155

2018-10-22 Thread Apache Jenkins Server
See 


Changes:

[mjsax] MINOR: Prohibit setting StreamsConfig commit.interval.ms to a negative

--
[...truncated 2.83 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.k