[jira] [Created] (KAFKA-7068) ConfigTransformer doesn't handle null values

2018-06-17 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-7068:
--

 Summary: ConfigTransformer doesn't handle null values
 Key: KAFKA-7068
 URL: https://issues.apache.org/jira/browse/KAFKA-7068
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Magesh kumar Nandakumar
 Fix For: 2.0.0


ConfigTransformer fails with NPE when the input configs have keys with null 
values. This is a blocker for 2.0.0 since connectors configs can have null 
values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-0.10.2-jdk7 #216

2018-06-17 Thread Apache Jenkins Server
See 


Changes:

[me] KAFKA-7058: Comparing schema default values using Objects#deepEquals()

--
[...truncated 1.31 MB...]
org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testStateChangeStartClose STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testStateChangeStartClose PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown 
STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldNotViolateAtLeastOnceWhenExceptionOccursDuringCloseTopologyWhenSuspendingState
 STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldNotViolateAtLeastOnceWhenExceptionOccursDuringCloseTopologyWhenSuspendingState
 PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testPartitionAssignmentChange STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testPartitionAssignmentChange PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > testMaybeClean 
STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > testMaybeClean 
PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldReleaseStateDirLockIfFailureOnTaskCloseForUnassignedSuspendedTask STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldReleaseStateDirLockIfFailureOnTaskCloseForUnassignedSuspendedTask PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > testMetrics 
STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > testMetrics 
PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks
 STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks
 PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown 
STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown PASSED

org.apache.kafka.streams.processor.internals.StreamsKafkaClientTest > 
testConfigFromStreamsConfig STARTED

org.apache.kafka.streams.processor.internals.StreamsKafkaClientTest > 
testConfigFromStreamsConfig PASSED

org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestampTest > 
logAndSkipOnInvalidTimestamp STARTED

org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestampTest > 
logAndSkipOnInvalidTimestamp PASSED

org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestampTest > 
extractMetadataTimestamp STARTED

org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestampTest > 
extractMetadataTimestamp PASSED

org.apache.kafka.streams.processor.FailOnInvalidTimestampTest > 
failOnInvalidTimestamp STARTED

org.apache.kafka.streams.processor.FailOnInvalidTimestampTest > 
failOnInvalidTimestamp PASSED

org.apache.kafka.streams.processor.FailOnInvalidTimestampTest > 
extractMetadataTimestamp STARTED

org.apache.kafka.streams.processor.FailOnInvalidTimestampTest > 
extractMetadataTimestamp PASSED

org.apache.kafka.streams.processor.WallclockTimestampExtractorTest > 
extractSystemTimestamp STARTED

org.apache.kafka.streams.processor.WallclockTimestampExtractorTest > 
extractSystemTimestamp PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorSupplier STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorSupplier PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotSetApplicationIdToNull STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotSetApplicationIdToNull PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testSourceTopics 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testSourceTopics PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testNamedTopicMatchesAlreadyProvidedPattern STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testNamedTopicMatchesAlreadyProvidedPattern PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigWithCompactAndDelete

Jenkins build is back to normal : kafka-0.10.0-jdk7 #221

2018-06-17 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-0.11.0-jdk7 #382

2018-06-17 Thread Apache Jenkins Server
See 


Changes:

[me] KAFKA-7058: Comparing schema default values using Objects#deepEquals()

--
[...truncated 903.89 KB...]

kafka.producer.ProducerTest > testSendToNewTopic STARTED

kafka.producer.ProducerTest > testSendToNewTopic PASSED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout STARTED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout PASSED

kafka.producer.ProducerTest > testSendNullMessage STARTED

kafka.producer.ProducerTest > testSendNullMessage PASSED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo STARTED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo PASSED

kafka.producer.ProducerTest > testSendWithDeadBroker STARTED

kafka.producer.ProducerTest > testSendWithDeadBroker PASSED

kafka.producer.SyncProducerTest > testReachableServer STARTED

kafka.producer.SyncProducerTest > testReachableServer PASSED

kafka.producer.SyncProducerTest > testMessageSizeTooLarge STARTED

kafka.producer.SyncProducerTest > testMessageSizeTooLarge PASSED

kafka.producer.SyncProducerTest > testNotEnoughReplicas STARTED

kafka.producer.SyncProducerTest > testNotEnoughReplicas PASSED

kafka.producer.SyncProducerTest > testMessageSizeTooLargeWithAckZero STARTED

kafka.producer.SyncProducerTest > testMessageSizeTooLargeWithAckZero PASSED

kafka.producer.SyncProducerTest > testProducerCanTimeout STARTED

kafka.producer.SyncProducerTest > testProducerCanTimeout PASSED

kafka.producer.SyncProducerTest > testProduceRequestWithNoResponse STARTED

kafka.producer.SyncProducerTest > testProduceRequestWithNoResponse PASSED

kafka.producer.SyncProducerTest > testEmptyProduceRequest STARTED

kafka.producer.SyncProducerTest > testEmptyProduceRequest PASSED

kafka.producer.SyncProducerTest > testProduceCorrectlyReceivesResponse STARTED

kafka.producer.SyncProducerTest > testProduceCorrectlyReceivesResponse PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic STARTED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired STARTED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents STARTED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testBatchSize STARTED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents STARTED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testProducerQueueSize STARTED

kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner STARTED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration STARTED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition STARTED

kafka.producer.AsyncProducerTest > testInvalidPartition PASSED

kafka.producer.AsyncProducerTest > testNoBroker STARTED

kafka.producer.AsyncProducerTest > testNoBroker PASSED

kafka.producer.AsyncProducerTest > testProduceAfterClosed STARTED

kafka.producer.AsyncProducerTest > testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest > testJavaProducer STARTED

kafka.producer.AsyncProducerTest > testJavaProducer PASSED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder STARTED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder PASSED

kafka.api.LogAppendTimeTest > testProduceConsume STARTED

kafka.api.LogAppendTimeTest > testProduceConsume PASSED

kafka.api.FetchRequestTest > testShuffleWithSingleTopic STARTED

kafka.api.FetchRequestTest > testShuffleWithSingleTopic PASSED

kafka.api.FetchRequestTest > testShuffle STARTED

kafka.api.FetchRequestTest > testShuffle PASSED

kafka.api.PlaintextProducerSendTest > 
testSendCompressedMessageWithLogAppendTime STARTED

kafka.api.PlaintextProducerSendTest > 
testSendCompressedMessageWithLogAppendTime PASSED

kafka.api.PlaintextProducerSendTest > testAutoCreateTopic STARTED

kafka.api.PlaintextProducerSendTest > testAutoCreateTopic PASSED

kafka.api.PlaintextProducerSendTest > testSendWithInvalidCreateTime STARTED

kafka.api.PlaintextProducerSendTest > testSendWithInvalidCreateTime PASSED

kafka.api.PlaintextProducerSendTest > testBatchSizeZero STARTED

kafka.api.PlaintextProducerSendTest > testBatchSizeZero PASSED

kafka.api.PlaintextProducerSendTest > testWrongSerializer STARTED

kafka.api.PlaintextProducerSendTest > testWrongSerializer PASSED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithLogAppendTime STARTED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithLogAppendTime PASSED

kafka.api.PlaintextProducerSendTest > 

[jira] [Resolved] (KAFKA-7068) ConfigTransformer doesn't handle null values

2018-06-17 Thread Ewen Cheslack-Postava (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-7068.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

Issue resolved by pull request 5241
[https://github.com/apache/kafka/pull/5241]

> ConfigTransformer doesn't handle null values
> 
>
> Key: KAFKA-7068
> URL: https://issues.apache.org/jira/browse/KAFKA-7068
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.0.0, 2.1.0
>
>
> ConfigTransformer fails with NPE when the input configs have keys with null 
> values. This is a blocker for 2.0.0 since connectors configs can have null 
> values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk10 #227

2018-06-17 Thread Apache Jenkins Server
See 


Changes:

[me] KAFKA-7068: Handle null config values during transform (KIP-297)

--
[...truncated 1.57 MB...]
kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteAbort STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteAbort PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareAbortState
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareAbortState
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithErrorsNoneOnAddPartitionWhenNoErrorsAndPartitionsTheSame 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithErrorsNoneOnAddPartitionWhenNoErrorsAndPartitionsTheSame PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsEmpty STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsEmpty PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendPrepareAbortToLogOnEndTxnWhenStatusIsOngoingAndResultIsAbort STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendPrepareAbortToLogOnEndTxnWhenStatusIsOngoingAndResultIsAbort PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsNull STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsNull PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldGenerateNewProducerIdIfEpochsExhausted STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldGenerateNewProducerIdIfEpochsExhausted PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionOnAddPartitionsWhenStateIsPrepareAbort 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionOnAddPartitionsWhenStateIsPrepareAbort 
PASSED

kafka.coord

Re: [VOTE] KIP-280: Enhanced log compaction

2018-06-17 Thread Matthias J. Sax
Let me rephrase your answer to make sure I understand what you suggest:

If compaction strategy is configured to use "offset", and if there is a
header in the record with `key == offset`, than we should use the value
of the record header instead of the actual record offset?

Do I understand this correctly? If yes, what is the advantage of doing
this? From my point of view, it might be problematic, because if user A
creates a topic and configures "offset" compaction (with the intend that
the record offset should be uses), than a second user B can add a header
with key "offset" and thus break the intention of user A.

Also, if existing topics might have data with record header key
"offset", the change would not be backward compatible either.


-Matthias

On 6/16/18 6:59 PM, Ted Yu wrote:
> Pardon the brevity in my previous reply.
> I was talking about this bullet:
> 
> bq. When this configuration is set to anything other than "*offset*" or "
> *timestamp*", then the record headers are scanned for a key matching this
> value.
> 
> My point is that if matching key in the header is found, its value should
> take precedence over the value of the configuration.
> I understand that such interpretation may have slight performance cost.
> 
> Cheers
> 
> On Sat, Jun 16, 2018 at 6:29 PM, Matthias J. Sax 
> wrote:
> 
>> Ted,
>>
>> I am also not sure what you mean by "Shouldn't the selection in header
>> have higher precedence over the configuration"? What selection do you
>> mean? And want configuration?
>>
>>
>> About the first point, I think this is actually a valid concern: To
>> address this issue, it seems that we would need to change the accepted
>> format of the config. Instead of "offset", "timestamp", "",
>> we could replace the last one with "header=".
>>
>> WDYT?
>>
>>
>> -Matthias
>>
>> On 6/15/18 3:06 AM, Ted Yu wrote:
>>> If selection exists in header, the selection should override the config
>> value.
>>> Cheers
>>>  Original message From: Luis Cabral
>>  Date: 6/15/18  1:40 AM  (GMT-08:00) To:
>> dev@kafka.apache.org Subject: Re: [VOTE] KIP-280: Enhanced log compaction
>>> Hi,
>>>
>>> bq. Can the value be determined now ? My thinking is that what if there
>> is a third compaction strategy proposed in the future ? We should guard
>> against user unknowingly choosing the 'future' strategy.
>>>
>>> The idea is that the header name to use is flexible, which protects
>> current clients that may want to use this from having to adapt their
>> already existing header names (they can just specify a new name).
>>>
>>> bq. Shouldn't the selection in header have higher precedence over the
>> configuration ?
>>>
>>> Not sure what you mean here, could you clarify?
>>>
>>> bq. Please create JIRA if you haven't already.
>>>
>>> Done: https://issues.apache.org/jira/browse/KAFKA-7061
>>>
>>> Cheers,
>>> Luís
>>>
 On 11 Jun 2018, at 01:50, Ted Yu  wrote:

 bq. When this configuration is set to anything other than "*offset*" or
>> "
 *timestamp*", then the record headers are scanned for a key matching
>> this
 value.

 Can the value be determined now ? My thinking is that what if there is a
 third compaction strategy proposed in the future ? We should guard
>> against
 user unknowingly choosing the 'future' strategy.

 bq. If this header is found

 Shouldn't the selection in header have higher precedence over the
>> configuration
 ?

 Please create JIRA if you haven't already.

 Thanks

 On Sat, Jun 9, 2018 at 12:39 AM, Luís Cabral
>> 
 wrote:

> Hi all,
>
> Any takers on having a look at this KIP and voting on it?
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 280%3A+Enhanced+log+compaction
>
> Cheers,
> Luis
>
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] KIP-280: Enhanced log compaction

2018-06-17 Thread Ted Yu
My previous reply was just an alternative for consideration.

bq.  than a second user B can add a header with key "offset" and thus break
the intention of user A

I didn't see such scenario after reading the KIP. Maybe add this as
reasoning for the current approach ?

I wonder how user B gets to know the intention of user A. Meaning, if user
B doesn't follow the norm set by user A, there still would be issue, right ?


On Sun, Jun 17, 2018 at 4:58 PM, Matthias J. Sax 
wrote:

> Let me rephrase your answer to make sure I understand what you suggest:
>
> If compaction strategy is configured to use "offset", and if there is a
> header in the record with `key == offset`, than we should use the value
> of the record header instead of the actual record offset?
>
> Do I understand this correctly? If yes, what is the advantage of doing
> this? From my point of view, it might be problematic, because if user A
> creates a topic and configures "offset" compaction (with the intend that
> the record offset should be uses), than a second user B can add a header
> with key "offset" and thus break the intention of user A.
>
> Also, if existing topics might have data with record header key
> "offset", the change would not be backward compatible either.
>
>
> -Matthias
>
> On 6/16/18 6:59 PM, Ted Yu wrote:
> > Pardon the brevity in my previous reply.
> > I was talking about this bullet:
> >
> > bq. When this configuration is set to anything other than "*offset*" or "
> > *timestamp*", then the record headers are scanned for a key matching this
> > value.
> >
> > My point is that if matching key in the header is found, its value should
> > take precedence over the value of the configuration.
> > I understand that such interpretation may have slight performance cost.
> >
> > Cheers
> >
> > On Sat, Jun 16, 2018 at 6:29 PM, Matthias J. Sax 
> > wrote:
> >
> >> Ted,
> >>
> >> I am also not sure what you mean by "Shouldn't the selection in header
> >> have higher precedence over the configuration"? What selection do you
> >> mean? And want configuration?
> >>
> >>
> >> About the first point, I think this is actually a valid concern: To
> >> address this issue, it seems that we would need to change the accepted
> >> format of the config. Instead of "offset", "timestamp", "",
> >> we could replace the last one with "header=".
> >>
> >> WDYT?
> >>
> >>
> >> -Matthias
> >>
> >> On 6/15/18 3:06 AM, Ted Yu wrote:
> >>> If selection exists in header, the selection should override the config
> >> value.
> >>> Cheers
> >>>  Original message From: Luis Cabral
> >>  Date: 6/15/18  1:40 AM  (GMT-08:00) To:
> >> dev@kafka.apache.org Subject: Re: [VOTE] KIP-280: Enhanced log
> compaction
> >>> Hi,
> >>>
> >>> bq. Can the value be determined now ? My thinking is that what if there
> >> is a third compaction strategy proposed in the future ? We should guard
> >> against user unknowingly choosing the 'future' strategy.
> >>>
> >>> The idea is that the header name to use is flexible, which protects
> >> current clients that may want to use this from having to adapt their
> >> already existing header names (they can just specify a new name).
> >>>
> >>> bq. Shouldn't the selection in header have higher precedence over the
> >> configuration ?
> >>>
> >>> Not sure what you mean here, could you clarify?
> >>>
> >>> bq. Please create JIRA if you haven't already.
> >>>
> >>> Done: https://issues.apache.org/jira/browse/KAFKA-7061
> >>>
> >>> Cheers,
> >>> Luís
> >>>
>  On 11 Jun 2018, at 01:50, Ted Yu  wrote:
> 
>  bq. When this configuration is set to anything other than "*offset*"
> or
> >> "
>  *timestamp*", then the record headers are scanned for a key matching
> >> this
>  value.
> 
>  Can the value be determined now ? My thinking is that what if there
> is a
>  third compaction strategy proposed in the future ? We should guard
> >> against
>  user unknowingly choosing the 'future' strategy.
> 
>  bq. If this header is found
> 
>  Shouldn't the selection in header have higher precedence over the
> >> configuration
>  ?
> 
>  Please create JIRA if you haven't already.
> 
>  Thanks
> 
>  On Sat, Jun 9, 2018 at 12:39 AM, Luís Cabral
> >> 
>  wrote:
> 
> > Hi all,
> >
> > Any takers on having a look at this KIP and voting on it?
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 280%3A+Enhanced+log+compaction
> >
> > Cheers,
> > Luis
> >
> >>>
> >>
> >>
> >
>
>


Re: [VOTE] KIP-280: Enhanced log compaction

2018-06-17 Thread Guozhang Wang
I think refactoring the value `header-key` to `header=` is a
better idea, to allow users to specify using the header key which happen to
be the same name to either `offset` or `timestamp`.


Guozhang

On Sun, Jun 17, 2018 at 5:36 PM, Ted Yu  wrote:

> My previous reply was just an alternative for consideration.
>
> bq.  than a second user B can add a header with key "offset" and thus break
> the intention of user A
>
> I didn't see such scenario after reading the KIP. Maybe add this as
> reasoning for the current approach ?
>
> I wonder how user B gets to know the intention of user A. Meaning, if user
> B doesn't follow the norm set by user A, there still would be issue, right
> ?
>
>
> On Sun, Jun 17, 2018 at 4:58 PM, Matthias J. Sax 
> wrote:
>
> > Let me rephrase your answer to make sure I understand what you suggest:
> >
> > If compaction strategy is configured to use "offset", and if there is a
> > header in the record with `key == offset`, than we should use the value
> > of the record header instead of the actual record offset?
> >
> > Do I understand this correctly? If yes, what is the advantage of doing
> > this? From my point of view, it might be problematic, because if user A
> > creates a topic and configures "offset" compaction (with the intend that
> > the record offset should be uses), than a second user B can add a header
> > with key "offset" and thus break the intention of user A.
> >
> > Also, if existing topics might have data with record header key
> > "offset", the change would not be backward compatible either.
> >
> >
> > -Matthias
> >
> > On 6/16/18 6:59 PM, Ted Yu wrote:
> > > Pardon the brevity in my previous reply.
> > > I was talking about this bullet:
> > >
> > > bq. When this configuration is set to anything other than "*offset*"
> or "
> > > *timestamp*", then the record headers are scanned for a key matching
> this
> > > value.
> > >
> > > My point is that if matching key in the header is found, its value
> should
> > > take precedence over the value of the configuration.
> > > I understand that such interpretation may have slight performance cost.
> > >
> > > Cheers
> > >
> > > On Sat, Jun 16, 2018 at 6:29 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > >> Ted,
> > >>
> > >> I am also not sure what you mean by "Shouldn't the selection in header
> > >> have higher precedence over the configuration"? What selection do you
> > >> mean? And want configuration?
> > >>
> > >>
> > >> About the first point, I think this is actually a valid concern: To
> > >> address this issue, it seems that we would need to change the accepted
> > >> format of the config. Instead of "offset", "timestamp",
> "",
> > >> we could replace the last one with "header=".
> > >>
> > >> WDYT?
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 6/15/18 3:06 AM, Ted Yu wrote:
> > >>> If selection exists in header, the selection should override the
> config
> > >> value.
> > >>> Cheers
> > >>>  Original message From: Luis Cabral
> > >>  Date: 6/15/18  1:40 AM  (GMT-08:00)
> To:
> > >> dev@kafka.apache.org Subject: Re: [VOTE] KIP-280: Enhanced log
> > compaction
> > >>> Hi,
> > >>>
> > >>> bq. Can the value be determined now ? My thinking is that what if
> there
> > >> is a third compaction strategy proposed in the future ? We should
> guard
> > >> against user unknowingly choosing the 'future' strategy.
> > >>>
> > >>> The idea is that the header name to use is flexible, which protects
> > >> current clients that may want to use this from having to adapt their
> > >> already existing header names (they can just specify a new name).
> > >>>
> > >>> bq. Shouldn't the selection in header have higher precedence over the
> > >> configuration ?
> > >>>
> > >>> Not sure what you mean here, could you clarify?
> > >>>
> > >>> bq. Please create JIRA if you haven't already.
> > >>>
> > >>> Done: https://issues.apache.org/jira/browse/KAFKA-7061
> > >>>
> > >>> Cheers,
> > >>> Luís
> > >>>
> >  On 11 Jun 2018, at 01:50, Ted Yu  wrote:
> > 
> >  bq. When this configuration is set to anything other than "*offset*"
> > or
> > >> "
> >  *timestamp*", then the record headers are scanned for a key matching
> > >> this
> >  value.
> > 
> >  Can the value be determined now ? My thinking is that what if there
> > is a
> >  third compaction strategy proposed in the future ? We should guard
> > >> against
> >  user unknowingly choosing the 'future' strategy.
> > 
> >  bq. If this header is found
> > 
> >  Shouldn't the selection in header have higher precedence over the
> > >> configuration
> >  ?
> > 
> >  Please create JIRA if you haven't already.
> > 
> >  Thanks
> > 
> >  On Sat, Jun 9, 2018 at 12:39 AM, Luís Cabral
> > >> 
> >  wrote:
> > 
> > > Hi all,
> > >
> > > Any takers on having a look at this KIP and voting on it?
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-

Re: [VOTE] KIP-280: Enhanced log compaction

2018-06-17 Thread Matthias J. Sax
Well, for "offset" and "timestamp" policy, not communication between
both is required.

Only if headers are used, user A should communicate the corresponding
header key to user B.


@Luis: can you update the KIP accordingly?



-Matthias

On 6/17/18 5:36 PM, Ted Yu wrote:
> My previous reply was just an alternative for consideration.
> 
> bq.  than a second user B can add a header with key "offset" and thus break
> the intention of user A
> 
> I didn't see such scenario after reading the KIP. Maybe add this as
> reasoning for the current approach ?
> 
> I wonder how user B gets to know the intention of user A. Meaning, if user
> B doesn't follow the norm set by user A, there still would be issue, right ?
> 
> 
> On Sun, Jun 17, 2018 at 4:58 PM, Matthias J. Sax 
> wrote:
> 
>> Let me rephrase your answer to make sure I understand what you suggest:
>>
>> If compaction strategy is configured to use "offset", and if there is a
>> header in the record with `key == offset`, than we should use the value
>> of the record header instead of the actual record offset?
>>
>> Do I understand this correctly? If yes, what is the advantage of doing
>> this? From my point of view, it might be problematic, because if user A
>> creates a topic and configures "offset" compaction (with the intend that
>> the record offset should be uses), than a second user B can add a header
>> with key "offset" and thus break the intention of user A.
>>
>> Also, if existing topics might have data with record header key
>> "offset", the change would not be backward compatible either.
>>
>>
>> -Matthias
>>
>> On 6/16/18 6:59 PM, Ted Yu wrote:
>>> Pardon the brevity in my previous reply.
>>> I was talking about this bullet:
>>>
>>> bq. When this configuration is set to anything other than "*offset*" or "
>>> *timestamp*", then the record headers are scanned for a key matching this
>>> value.
>>>
>>> My point is that if matching key in the header is found, its value should
>>> take precedence over the value of the configuration.
>>> I understand that such interpretation may have slight performance cost.
>>>
>>> Cheers
>>>
>>> On Sat, Jun 16, 2018 at 6:29 PM, Matthias J. Sax 
>>> wrote:
>>>
 Ted,

 I am also not sure what you mean by "Shouldn't the selection in header
 have higher precedence over the configuration"? What selection do you
 mean? And want configuration?


 About the first point, I think this is actually a valid concern: To
 address this issue, it seems that we would need to change the accepted
 format of the config. Instead of "offset", "timestamp", "",
 we could replace the last one with "header=".

 WDYT?


 -Matthias

 On 6/15/18 3:06 AM, Ted Yu wrote:
> If selection exists in header, the selection should override the config
 value.
> Cheers
>  Original message From: Luis Cabral
  Date: 6/15/18  1:40 AM  (GMT-08:00) To:
 dev@kafka.apache.org Subject: Re: [VOTE] KIP-280: Enhanced log
>> compaction
> Hi,
>
> bq. Can the value be determined now ? My thinking is that what if there
 is a third compaction strategy proposed in the future ? We should guard
 against user unknowingly choosing the 'future' strategy.
>
> The idea is that the header name to use is flexible, which protects
 current clients that may want to use this from having to adapt their
 already existing header names (they can just specify a new name).
>
> bq. Shouldn't the selection in header have higher precedence over the
 configuration ?
>
> Not sure what you mean here, could you clarify?
>
> bq. Please create JIRA if you haven't already.
>
> Done: https://issues.apache.org/jira/browse/KAFKA-7061
>
> Cheers,
> Luís
>
>> On 11 Jun 2018, at 01:50, Ted Yu  wrote:
>>
>> bq. When this configuration is set to anything other than "*offset*"
>> or
 "
>> *timestamp*", then the record headers are scanned for a key matching
 this
>> value.
>>
>> Can the value be determined now ? My thinking is that what if there
>> is a
>> third compaction strategy proposed in the future ? We should guard
 against
>> user unknowingly choosing the 'future' strategy.
>>
>> bq. If this header is found
>>
>> Shouldn't the selection in header have higher precedence over the
 configuration
>> ?
>>
>> Please create JIRA if you haven't already.
>>
>> Thanks
>>
>> On Sat, Jun 9, 2018 at 12:39 AM, Luís Cabral
 
>> wrote:
>>
>>> Hi all,
>>>
>>> Any takers on having a look at this KIP and voting on it?
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 280%3A+Enhanced+log+compaction
>>>
>>> Cheers,
>>> Luis
>>>
>


>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Error in Kafka Stream

2018-06-17 Thread Guozhang Wang
Hello Amandeep,

What file system are you using? Also is `/opt/info` a temp folder that can
be auto-cleared from time to time?


Guozhang

On Fri, Jun 15, 2018 at 6:39 AM, Amandeep Singh 
wrote:

> Hi,
>
>
>
>  I am getting the below error while processign data with kafka stream. The
> application was runnign for a couple of hours and the '
> WatchlistUpdate-StreamThread-9 ' thread was assigned to the same partition
> since beginning. I am assuming it was able to successfully commit offsets
> for those couple of hours and the directory '
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2
> ' did exist for that period.
>
>  And then I start getting the below error after every 30 secs (probably
> because if offset commit interval)  and messages are being missed from
> processing.
>
> Can you please help?
>
>
> 2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN
> o.a.k.s.p.i.ProcessorStateManager:246
> - task [0_2] Failed
>
> to write checkpoint file to
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2/.che
>
> ckpoint:
>
> java.io.FileNotFoundException:
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2/.
>
> checkpoint.tmp (No such file or directory)
>
> at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
>
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[na:1.8.0_141]
>
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> ~[na:1.8.0_141]
>
> at java.io.FileOutputStream.(FileOutputStream.java:162)
> ~[na:1.8.0_141]
>
> at
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> OffsetCheckpoint.java:73)
> ~[kafka-streams-
>
> 1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> checkpoint(ProcessorStateManager.java:3
>
> 20) ~[kafka-streams-1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> java:306)
> [kafka-streams-1.0.0.ja
>
> r:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:2
>
> 08) [kafka-streams-1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:299)
> [kafka-streams-1.0.0.j
>
> ar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:289)
> [kafka-streams-1.0.0.j
>
> ar:na]
>
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(
> AssignedTasks.java:87)
> [kafka-streams-1
>
> .0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:451)
> [ka
>
> fka-streams-1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(
> AssignedTasks.java:380)
> [kafka-streams-1
>
> .0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(
> TaskManager.java:309)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:1018)
> [kafka-strea
>
> ms-1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:835)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:774)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:744)
> [kafka-streams-1.0.0.
>
> jar:na]
>
>
> Stream config:
>
> 2018-06-15 08:09:28 [main] INFO  o.a.k.c.consumer.ConsumerConfig:223 -
> ConsumerConfig values:
>
> auto.commit.interval.ms = 5000
>
> auto.offset.reset = earliest
>
> bootstrap.servers = [XYZ]
>
> check.crcs = true
>
> client.id = WatchlistUpdate-StreamThread-9-consumer
>
> connections.max.idle.ms = 54
>
> enable.auto.commit = false
>
> exclude.internal.topics = true
>
> fetch.max.bytes = 52428800
>
> fetch.max.wait.ms = 500
>
> fetch.min.bytes = 1
>
> group.id = UI-Watchlist-ES-App
>
> heartbeat.interval.ms = 3000
>
> interceptor.classes = null
>
> internal.leave.group.on.close = false
>
> isolation.level = read_uncommitted
>
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> max.partition.fetch.bytes = 1048576
>
> max.poll.interval.ms = 2147483647
>
> max.poll.records = 1000
>
> metadata.max.age.ms = 30
>
> metric.reporters = []
>
> metrics.num.samples = 2
>
> metrics.recording.level = INFO
>
> metrics.sample.window.ms = 3
>
> partition.assignment.strategy =
> [org.apache.kafka.streams.processor.

Jenkins build is back to normal : kafka-trunk-jdk10 #228

2018-06-17 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #2747

2018-06-17 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7048 NPE when creating connector (#5202)

--
[...truncated 944.62 KB...]
kafka.zookeeper.ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler STARTED

kafka.zookeeper.ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler PASSED

kafka.zookeeper.ZooKeeperClientTest > testUnresolvableConnectString STARTED

kafka.zookeeper.ZooKeeperClientTest > testUnresolvableConnectString PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testPipelinedGetData STARTED

kafka.zookeeper.ZooKeeperClientTest > testPipelinedGetData PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChildChangeHandlerForChildChange 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChildChangeHandlerForChildChange 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNodeWithChildren 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNodeWithChildren 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetDataExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetDataExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testMixedPipeline STARTED

kafka.zookeeper.ZooKeeperClientTest > testMixedPipeline PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetDataExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetDataExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testDeleteExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testDeleteExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiry STARTED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiry PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetDataNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetDataNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testDeleteNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testDeleteNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testExistsExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testExistsExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperStateChangeRateMetrics 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperStateChangeRateMetrics PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDeletion STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDeletion PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetAclNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetAclNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testStateChangeHandlerForAuthFailure 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testStateChangeHandlerForAuthFailure 
PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > testNoOpAction STARTED

kafka.network.SocketServerTest > testNoOpAction PASSED

kafka.network.SocketServerTest > simpleRequest STARTED

kafka.network.SocketServerTest > simpleRequest PASSED

kafka.network.SocketServerTest > closingChannelException STARTED

kafka.network.SocketServerTest > closingChannelException PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingInProgress STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrott

Re: [DISCUSSION] KIP-300: Add Windowed KTable API in StreamsBuilder

2018-06-17 Thread Guozhang Wang
Thanks for the KIP Boyang, I made a pass over the KIP and the PR and have
some comments:

1. About the public API, I agree with Matthias that we can consider
exposing the `innerDeserializer` and `innerSerializer` in the
Time/SessionWindowSerializer/Deserializer, and the `innerSerde` in `
WrapperSerde` so that users can still pass in a `WindowedSerde` into
Materialized and Consumed. So that we can have the public API as:

```
(final String topic, final Consumed, V> consumed, final
Materialized, V, WindowStore> materialized);

(final String topic, final Consumed, V> consumed);

(final String topic, final Materialized, V, WindowStore> materialized);
```

2. There is another WIP interface change to introduce a WindowedKTable as an alias of KTable, V> for a different purpose of adding
some functions only allowed for windowed table. I'm wondering with this
interface class if we can work around the Java "method has same erasure"
error with the same function name? This is just a wild thought, and I think
if we ended up adding `Windowed` into the parameters it may not matter
about the signature anyways.


3. This is just a question about your use case: it seems in your scenarios,
you will materialize the window store twice in your topology: first time
when you generate the windowed KTable from an windowed aggregation
operator, the aggregation result i.e. the KTable, V> is already
materialized into a store, and then when you pipe the changelog of this
windowed KTable through an intermediate topic, and read from this topic to
form a KTable, V>, you will materialize this store again, and
the two materialized state stores will contain completely the same data.
Have you thought about whether you really need to materialize it twice?


Guozhang



On Sun, Jun 10, 2018 at 3:04 PM, Matthias J. Sax 
wrote:

> Thanks lot for the KIP. The general idea to allow reading
> windowed-KTables is very useful! Couple of initial comments/question:
>
>
>
> About only adding a single `windowedTable()` with no overloads:
>
>  - retention time is no mandatory parameter and we can always use the
> default of 1 day
>
>  - instead of enforcing both, Consumed and Materialized, we could also
> extend WindowedSerde to exposed its wrapped key-Serde; thus, if a user
> passes in `WindowedSerde` the inner key-serde can be extracted and users
> do not need to pass in the key-Serde explicitly.
>
>  - while I agree that we need to pass in the window-size parameter, I am
> not sure if using Materialized is the best way to do this; it seems that
> window-size is the only mandatory parameter, thus we might be able to
> pass it directly and thus allow to make `Consumed` and `Materialized`
> optional. Something like:
>
> > windowedTable(String topicName, long windowSizeMs);
>
> As an (maybe better) alternative, we could also introduce a public
> `Windowed` interface similar to `Produced`, `Consumed`, `Materialized`
> etc that we us to pass in window parameters. Or maybe reuse the existing
> `Windows` class (ie, the same definition that is used in
> KGroupStream#windowedBy() can be passed into the new `windowedTable()`
> method.
>
>
>
> I also noticed, that the KIP only covers TimesWindows. Should we extend
> it to cover SessionWindows, too?
>
>
>
> > One side effect is that we bring `ChangeLoggingWindowBytesStore`
> public for unit test purpose.
>
> No need to mention this, because this class in in package "internals"
> and not part of public API.
>
>
>
>
> -Matthias
>
>
>
> On 5/24/18 10:39 PM, Boyang Chen wrote:
> > Hey friends,
> >
> >
> > I know this is critical time for the 2.0 release. Just want to call out
> again for further review on the API format. Any feedback would be
> appreciated, thank you!
> >
> >
> > Boyang
> >
> > 
> > From: Liquan Pei 
> > Sent: Tuesday, May 22, 2018 4:29 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSSION] KIP-300: Add Windowed KTable API in
> StreamsBuilder
> >
> > This KIP makes sharing a WindowedKTable among Kafka Stream jobs very
> easy.
> > It would be nice to get this into trunk soon.
> >
> > Best,
> > Liquan
> >
> > On Mon, May 21, 2018 at 12:25 PM, Boyang Chen 
> wrote:
> >
> >> Hey all,
> >>
> >>
> >> I would like to start a discussion thread on KIP 300, which introduces a
> >> new API called windowedTable() in StreamsBuilder:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 300%3A+Add+Windowed+KTable+API+in+StreamsBuilder
> >>
> >>
> >> The pull request I'm working on is here: https://github.com/apache/
> >> kafka/pull/5044
> >>
> >>
> >> I understood that the community is busy working on 2.0 release, but this
> >> KIP is really important for our internal use case. So if any of you got
> >> time, please focus on clarifying the use case and reaching the
> agreement of
> >> API. Really appreciate your time!
> >>
> >>
> >> Best,
> >>
> >> Boyang
> >>
> >>
> >>
> >
> >
> > --
> > Liquan Pei
> > Software Engineer, Confluent Inc
> >
>
>


-- 
-- Guozhang