Build failed in Jenkins: kafka-trunk-jdk8 #3250

2018-12-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-6036: Local Materialization for Source KTable (#5779)

--
[...truncated 2.24 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

> Task :streams:streams-scala:spotbugsMain

> Task :streams:streams-scala:test

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion PASSED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaJoin STARTED

org.apache.kafka.streams

Re: Kafka replication error

2018-12-09 Thread Suman B N
Thanks Tony.

How can I check the number of pending requests and the idle percentage
of the requests handler on node 6? Are there any metrics for those?

Controller logs confirm that the controller is not able to send
updateMetadataRequest() to that particular node acting as the follower.
Whereas to all other nodes, it is able to send the request and get the
response. But with node 6, before it gets the response, the channel is
closed. Hence we see the error logs mentioned in the earlier thread.

Also looks like we hit this
 deadlock situation. In
the next version, it has been fixed.

Solutions tried are:

   - Restart kafka service on that node or follower which is not syncing
   with the leader(node 6 as per example). Didn't help.
   - Invoke controller election by removing controller znode in zookeeper.
   Didn't help.
   - Restart the machine itself. Ironically, this worked for us! After the
   restart, the controller was able to send updateMetadataRequest to node and
   node started syncing with the leader. Took some time to be in-sync but it
   worked.

Thanks,
Suman
Thanks,
Suman

On Sun, Dec 9, 2018 at 11:53 AM Tony Liu 
wrote:

> you mentioned:
>
>1. broker disconnection error, normally the case I have ever seen is
>when some broker is busy and can not response connection quickly to
> other
>replicas.
>2. partitions under-replicated, normally pint to some broker may have a
>performance issue.
>3. 90% under-replicated partitions have the same node, let 's say the
>broker id is 6.
>
> that gives me some idea your broker with id 6 may have some bottleneck, so
> can you also check the number of pending requests and the idle percentage
> of the requests handler on node 6?
>
> thanks.
>
> ty
>
> On Sat, Dec 8, 2018 at 9:02 PM Suman B N  wrote:
>
> > Still hoping for some help here.
> >
> > On Fri, Dec 7, 2018 at 12:24 AM Suman B N  wrote:
> >
> > > Guys,
> > > Another observation is 90% of under-replicated partitions have the same
> > > node as the follower.
> > >
> > > *Any help in here is very much appreciated. We have very less time to
> > > stabilize kafka. Thanks a lot in advance.*
> > >
> > > -Suman
> > >
> > > On Thu, Dec 6, 2018 at 9:08 PM Suman B N 
> wrote:
> > >
> > >> +users
> > >>
> > >> On Thu, Dec 6, 2018 at 9:01 PM Suman B N 
> wrote:
> > >>
> > >>> Team,
> > >>>
> > >>> We are observing ISR shrink and expand very frequently. In the logs
> of
> > >>> the follower, below errors are observed:
> > >>>
> > >>> [2018-12-06 20:00:42,709] WARN [ReplicaFetcherThread-2-15], Error in
> > >>> fetch kafka.server.ReplicaFetcherThread$FetchRequest@a0f9ba9
> > >>> (kafka.server.ReplicaFetcherThread)
> > >>> java.io.IOException: Connection to 15 was disconnected before the
> > >>> response was read
> > >>> at
> > >>>
> >
> kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$3(NetworkClientBlockingOps.scala:114)
> > >>> at
> > >>>
> >
> kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$3$adapted(NetworkClientBlockingOps.scala:112)
> > >>> at scala.Option.foreach(Option.scala:257)
> > >>> at
> > >>>
> >
> kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$1(NetworkClientBlockingOps.scala:112)
> > >>> at
> > >>>
> >
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136)
> > >>> at
> > >>>
> >
> kafka.utils.NetworkClientBlockingOps$.pollContinuously$extension(NetworkClientBlockingOps.scala:142)
> > >>> at
> > >>>
> >
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
> > >>> at
> > >>>
> >
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249)
> > >>> at
> > >>>
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
> > >>> at
> > >>>
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> > >>> at
> > >>>
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
> > >>> at
> > >>>
> >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
> > >>> at
> > >>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > >>>
> > >>> Can someone explain this? And help us understand how we can resolve
> > >>> these under-replicated partitions.
> > >>>
> > >>> server.properties file:
> > >>> broker.id=15
> > >>> port=9092
> > >>> zookeeper.connect=zk1,zk2,zk3,zk4,zk5,zk6
> > >>>
> > >>> default.replication.factor=2
> > >>> log.dirs=/data/kafka
> > >>> delete.topic.enable=true
> > >>> zookeeper.session.timeout.ms=1
> > >>> inter.broker.protocol.version=0.10.2
> > >>> num.partitions=3
> > >>> min.insync.replicas=1
> > >>> log.retention.ms=25920
> > >>> message.max.bytes=20971520
> > >>> replica.fetch.max.bytes=20971520
> > >>> replica.fetch.response.max.

Re: Kafka replication error

2018-12-09 Thread Tony Liu
yes, Kafka provides those metrics for monitoring the pending requests and
idle request handler, check their documents, you should get it, otherwise,
let me know.

from the operations you did:

   1. Restarting service is not help should be expected, since that broker
   is already being a bottleneck.
   2. Force controller election, I will not do that, since there is no
   controller issue.
   3. Restart server (id = 6) is working.

for #3 is working, basically, that gives me clues:

   1. Restart server, that means to clear away all of the pending requests
   or not finished work from broker 6.
   2. Restart server, it will trigger partition leader rebalance, the
   others help take over some traffic.

In term of the #1 & #2, though it's working by now, I doubt that server
will run into bottleneck again after a few days.

By the way, you can spend some time comparing broker 6 with other servers,
to see any possible you have the hardware, network configuration issues,


On Sun, Dec 9, 2018 at 1:22 AM Suman B N  wrote:

> Thanks Tony.
>
> How can I check the number of pending requests and the idle percentage
> of the requests handler on node 6? Are there any metrics for those?
>
> Controller logs confirm that the controller is not able to send
> updateMetadataRequest() to that particular node acting as the follower.
> Whereas to all other nodes, it is able to send the request and get the
> response. But with node 6, before it gets the response, the channel is
> closed. Hence we see the error logs mentioned in the earlier thread.
>
> Also looks like we hit this
>  deadlock situation. In
> the next version, it has been fixed.
>
> Solutions tried are:
>
>- Restart kafka service on that node or follower which is not syncing
>with the leader(node 6 as per example). Didn't help.
>- Invoke controller election by removing controller znode in zookeeper.
>Didn't help.
>- Restart the machine itself. Ironically, this worked for us! After the
>restart, the controller was able to send updateMetadataRequest to node
> and
>node started syncing with the leader. Took some time to be in-sync but
> it
>worked.
>
> Thanks,
> Suman
> Thanks,
> Suman
>
> On Sun, Dec 9, 2018 at 11:53 AM Tony Liu 
> wrote:
>
> > you mentioned:
> >
> >1. broker disconnection error, normally the case I have ever seen is
> >when some broker is busy and can not response connection quickly to
> > other
> >replicas.
> >2. partitions under-replicated, normally pint to some broker may have
> a
> >performance issue.
> >3. 90% under-replicated partitions have the same node, let 's say the
> >broker id is 6.
> >
> > that gives me some idea your broker with id 6 may have some bottleneck,
> so
> > can you also check the number of pending requests and the idle percentage
> > of the requests handler on node 6?
> >
> > thanks.
> >
> > ty
> >
> > On Sat, Dec 8, 2018 at 9:02 PM Suman B N  wrote:
> >
> > > Still hoping for some help here.
> > >
> > > On Fri, Dec 7, 2018 at 12:24 AM Suman B N 
> wrote:
> > >
> > > > Guys,
> > > > Another observation is 90% of under-replicated partitions have the
> same
> > > > node as the follower.
> > > >
> > > > *Any help in here is very much appreciated. We have very less time to
> > > > stabilize kafka. Thanks a lot in advance.*
> > > >
> > > > -Suman
> > > >
> > > > On Thu, Dec 6, 2018 at 9:08 PM Suman B N 
> > wrote:
> > > >
> > > >> +users
> > > >>
> > > >> On Thu, Dec 6, 2018 at 9:01 PM Suman B N 
> > wrote:
> > > >>
> > > >>> Team,
> > > >>>
> > > >>> We are observing ISR shrink and expand very frequently. In the logs
> > of
> > > >>> the follower, below errors are observed:
> > > >>>
> > > >>> [2018-12-06 20:00:42,709] WARN [ReplicaFetcherThread-2-15], Error
> in
> > > >>> fetch kafka.server.ReplicaFetcherThread$FetchRequest@a0f9ba9
> > > >>> (kafka.server.ReplicaFetcherThread)
> > > >>> java.io.IOException: Connection to 15 was disconnected before the
> > > >>> response was read
> > > >>> at
> > > >>>
> > >
> >
> kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$3(NetworkClientBlockingOps.scala:114)
> > > >>> at
> > > >>>
> > >
> >
> kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$3$adapted(NetworkClientBlockingOps.scala:112)
> > > >>> at scala.Option.foreach(Option.scala:257)
> > > >>> at
> > > >>>
> > >
> >
> kafka.utils.NetworkClientBlockingOps$.$anonfun$blockingSendAndReceive$1(NetworkClientBlockingOps.scala:112)
> > > >>> at
> > > >>>
> > >
> >
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136)
> > > >>> at
> > > >>>
> > >
> >
> kafka.utils.NetworkClientBlockingOps$.pollContinuously$extension(NetworkClientBlockingOps.scala:142)
> > > >>> at
> > > >>>
> > >
> >
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
> > > >

[DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-09 Thread Paul Whalen
Here's KIP-401 for discussion, a minor Kafka Streams API change that I
think could greatly increase the usability of the low-level processor API.
I have some code written but will wait to see if there is buy in before
going all out and creating a pull request.  It seems like most of the work
would be in updating documentation and tests.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756

Thanks!
Paul


Build failed in Jenkins: kafka-trunk-jdk8 #3251

2018-12-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-6036: Follow-up; cleanup sendOldValues logic

--
[...truncated 2.24 MB...]
org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithoutClose 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testGettingGroupWithTags 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testGettingGroupWithTags 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithClose 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithClose 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testKafkaMetricsNotNull 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testKafkaMetricsNotNull 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.FloatConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnectNull 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnectNull 
PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectBadSchema STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectBadSchema PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectNull STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectNull PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnect 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnect 
PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testFromConnect 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testFromConnect 
PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectInvalidValue STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectInvalidValue PASSED

org.apache.kafka.connect.converters.LongConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.LongConverterTest > testBytesNullToNumber 
PASS

Build failed in Jenkins: kafka-trunk-jdk11 #148

2018-12-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-6036: Follow-up; cleanup sendOldValues logic

--
[...truncated 2.24 MB...]

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testShouldAutoShutdownOnIncompleteMetadata[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testShouldAutoShutdownOnIncompleteMetadata[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testShouldAutoShutdownOnIncompleteMetadata[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testShouldAutoShutdownOnIncompleteMetadata[caching enabled = false] PASSED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin STARTED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin PASSED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldRestoreGlobalInMemoryKTableOnRestart STARTED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldRestoreGlobalInMemoryKTableOnRestart PASSED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableJoin STARTED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableJoin PASSED

org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldShutdownWhenRecordConstraintIsViolated STARTED

org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldShutdownWhenRecordConstraintIsViolated PASSED

org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldShutdownWhenBytesConstraintIsViolated STARTED

org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldShutdownWhenBytesConstraintIsViolated PASSED

org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin STARTED

org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin PASSED

org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest > 
shouldNotRestoreAbortedMessages STARTED

org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest > 
shouldNotRestoreAbortedMessages PASSED

org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest > 
shouldRestoreTransactionalMessages STARTED

org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest > 
shouldRestoreTransactionalMessages PASSED

org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest > 
shouldKStreamGlobalKTableJoin STARTED

org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest > 
shouldKStreamGlobalKTableJoin PASSED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest
 STARTED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest
 PASSED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldThrowExceptionOverlappingPattern STARTED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldThrowExceptionOverlappingPattern PASSED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldThrowExceptionOverlappingTopic STARTED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldThrowExceptionOverlappingTopic PASSED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets STARTED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets PASSED

org.apache.kafka.streams.integration.FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetR

Re: [VOTE] KIP-228 Negative record timestamp support

2018-12-09 Thread Guozhang Wang
Hi folks,

Thanks for your replies! Just to clarify, the proposal itself does not
introduce any additional fields in the message format (some new attributes
are mentioned in the Rejected Alternatives though), so my understand is
that we do not need to increment the magic byte version of the message
itself.

Also, ConsumerRecord.NO_TIMESTAMP (-1) is still used as "no timestamp", and
as we've discussed before it is a good trade-off to say "we do not have way
express Wednesday, December 31, 1969 11:59:59.999". I.e. we are extending
the timestamp to have other negative values than -1, but we did not change
the semantics if value -1 itself. So I think although we do bump up the
request protocol version for semantics changes, it is not necessary for
this case (please correct me if there are cases that would not work for it).

I do agree with Magnus's point that for ListOffsetsRequest, we should
consider using different values than -1 / -2 to indicate `EARLEST / LATEST
TIMESTAMP` now since for example timestamp -2 does have a meaningful
semantics now, and hence its protocol version would need bump as we change
its field. And I think a single byte indicating the type as (EARLEST,
LATEST, ACTUAL_TIMESTAMP_VALUE) should be sufficient, but I'll leave it to
Konstandin to decide if he wants to do this KIP or do it in another
follow-up KIP.


Guozhang


On Fri, Dec 7, 2018 at 5:06 PM Jun Rao  wrote:

> Hi, Konstandin,
>
> Thanks for the KIP. I agree with Magnus on the protocol version changes. As
> for the sentinel value, currently ConsumerRecord.NO_TIMESTAMP (-1) is used
> for V0 message format. For compatibility, it seems that we still need to
> preserve that.
>
> Jun
>
> On Thu, Dec 6, 2018 at 2:32 AM Magnus Edenhill  wrote:
>
> > Sorry for getting in the game this late, and on the wrong thread!
> >
> > I think negative timestamps makes sense and is a good addition,
> > but I have a couple of concerns with the proposal:
> >
> >  1. I believe any change to the protocol format or semantics require a
> > protocol bump, in this case for ProduceRequest, FetchRequest,
> > ListOffsetsRequest.
> >  2. ListOffsetsRequest should be changed to allow logical (END,
> BEGINNING)
> > and absolute lookups without special treatment of two absolute values as
> > logical (-1, -2), this seems like a hack and will require application
> logic
> > to avoid these timestamps, that's leaky abstraction.
> >  Perhaps add a new field `int8 LookupType = { BEGINNING=-2, END=-1,
> > TIMESTAMP=0 }`: the broker will either look up using the absolute
> > Timestamp, or logical offset value, depending on the value of LookupType.
> >  3. With the added Attribute for extended timestamp, do we really need to
> > have a sentinel value for an unset timestamp (-1 or Long.MIN_VALUE)?
> >  To make the logic simpler I suggest the attribute is renamed to just
> > Timestamp, and if the Timestamp attribute is set, the Timestamp field is
> > always a proper timestamp. If the bit is not set, no timestamp was
> > provided.
> >
> >  /Magnus
> >
> >
> >
> > Den tors 6 dec. 2018 kl 08:06 skrev Gwen Shapira :
> >
> > > I may be missing something, but why are we using an attribute for
> > > this? IIRC, we normally bump protocol version to indicate semantic
> > > changes. If I understand correctly, not using an attribute will allow
> > > us to not change the message format (just the protocol), which makes
> > > the upgrade significantly easier (since we don't up/down convert).
> > >
> > > Another thing I don't understand: The compatibility map indicates that
> > > NO_TIMESTAMP is now Long.MIN_VALUE, but a bit above that you say that
> > > -1 semantics does not change.
> > >
> > > Last: At around version 1.0 we decide to completely avoid changes that
> > > require a particular order of upgrades (this is why we added the
> > > versions API, up/down conversion, etc). So I'd like to see if we can
> > > avoid this here (should be relatively easy?). I'm CCing Magnus,
> > > because I think he remembers *why* we made this decision in first
> > > place.
> > >
> > > Gwen
> > > On Thu, Dec 6, 2018 at 4:44 PM Guozhang Wang 
> wrote:
> > > >
> > > > Bump up on this thread again: we have two binding votes already and
> > need
> > > > another committer to take a look at it and vote.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, Oct 19, 2018 at 11:34 AM Konstantin Chukhlomin <
> > > chuhlo...@gmail.com>
> > > > wrote:
> > > >
> > > > > bump
> > > > >
> > > > > On Tue, Jun 12, 2018 at 1:48 PM Bill Bejeck 
> > wrote:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > Thanks,
> > > > > > Bill
> > > > > >
> > > > > > On Sun, Jun 10, 2018 at 5:32 PM Ted Yu 
> > wrote:
> > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > On Sun, Jun 10, 2018 at 2:17 PM, Matthias J. Sax <
> > > > > matth...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 (binding)
> > > > > > > >
> > > > > > > > Thanks for the KIP.
> > > > > > > >
> > > > > > > >
> > > > > > > > -Matthias

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-09 Thread Guozhang Wang
Hello Paul,

Thanks for the great writeup (very detailed and crystal motivation
sections!).

This is quite an interesting idea and I do like the API cleanness you
proposed. The original motivation of letting StreamsTopology to add state
stores though, is to allow different processors to share the state store.
For example:

builder.addStore("store1");

// a path of stream transformations that leads to KStream stream1.
stream1.transform(..., "store1");

// another path that generates a KStream stream2.
stream2.transform(..., "store1");

Behind the scene, Streams will make sure stream1 / stream2 transformations
will always be grouped together as a single group of tasks, each of which
will be executed by a single thread and hence there's no concurrency issues
on accessing the store from different operators within the same task. I'm
not sure how common this use case is, but I'd like to hear if you have any
thoughts maintaining this since the current proposal seems exclude this
possibility.


Guozhang


On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen  wrote:

> Here's KIP-401 for discussion, a minor Kafka Streams API change that I
> think could greatly increase the usability of the low-level processor API.
> I have some code written but will wait to see if there is buy in before
> going all out and creating a pull request.  It seems like most of the work
> would be in updating documentation and tests.
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>
> Thanks!
> Paul
>


-- 
-- Guozhang


Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-12-09 Thread Guozhang Wang
Hello Adam / Jan / John,

Sorry for being late on this thread! I've finally got some time this
weekend to cleanup a load of tasks on my queue (actually I've also realized
there are a bunch of other things I need to enqueue while cleaning them up
--- sth I need to improve on my side). So here are my thoughts:

Regarding the APIs: I like the current written API in the KIP. More
generally I'd prefer to keep the 1) one-to-many join functionalities as
well as 2) other join types than inner as separate KIPs since 1) may worth
a general API refactoring that can benefit not only foreignkey joins but
collocate joins as well (e.g. an extended proposal of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup),
and I'm not sure if other join types would actually be needed (maybe left
join still makes sense), so it's better to wait-for-people-to-ask-and-add
than add-sth-that-no-one-uses.

Regarding whether we enforce step 3) / 4) v.s. introducing a
KScatteredTable for users to inject their own optimization: I'd prefer to
do the current option as-is, and my main rationale is for optimization
rooms inside the Streams internals and the API succinctness. For advanced
users who may indeed prefer KScatteredTable and do their own optimization,
while it is too much of the work to use Processor API directly, I think we
can still extend the current API to support it in the future if it becomes
necessary.

Another note about step 4) resolving out-of-ordering data, as I mentioned
before I think with KIP-258 (embedded timestamp with key-value store) we
can actually make this step simpler than the current proposal. In fact, we
can just keep a single final-result store with timestamps and reject values
that have a smaller timestamp, is that right?


That's all I have in mind now. Again, great appreciation to Adam to make
such HUGE progress on this KIP!


Guozhang

On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak 
wrote:

> If they don't find the time:
> They usually take the opposite path from me :D
> so the answer would be clear.
>
> hence my suggestion to vote.
>
>
> On 04.12.2018 21:06, Adam Bellemare wrote:
> > Hi Guozhang and Matthias
> >
> > I know both of you are quite busy, but we've gotten this KIP to a point
> > where we need more guidance on the API (perhaps a bit of a tie-breaker,
> if
> > you will). If you have anyone else you may think should look at this,
> > please tag them accordingly.
> >
> > The scenario is as such:
> >
> > Current Option:
> > API:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> > 1) Rekey the data to CombinedKey, and shuffles it to the partition with
> the
> > foreignKey (repartition 1)
> > 2) Join the data
> > 3) Shuffle the data back to the original node (repartition 2)
> > 4) Resolve out-of-order arrival / race condition due to foreign-key
> changes.
> >
> > Alternate Option:
> > Perform #1 and #2 above, and return a KScatteredTable.
> > - It would be keyed on a wrapped key function: , VR>
> (KO
> > = Other Table Key, K = This Table Key, VR = Joined Result)
> > - KScatteredTable.resolve() would perform #3 and #4 but otherwise a user
> > would be able to perform additional functions directly from the
> > KScatteredTable (TBD - currently out of scope).
> > - John's analysis 2-emails up is accurate as to the tradeoffs.
> >
> > Current Option is coded as-is. Alternate option is possible, but will
> > require for implementation details to be made in the API and some
> exposure
> > of new data structures into the API (ie: CombinedKey).
> >
> > I appreciate any insight into this.
> >
> > Thanks.
> >
> > On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare 
> > wrote:
> >
> >> Hi John
> >>
> >> Thanks for your feedback and assistance. I think your summary is
> accurate
> >> from my perspective. Additionally, I would like to add that there is a
> risk
> >> of inconsistent final states without performing the resolution. This is
> a
> >> major concern for me as most of the data I have dealt with is produced
> by
> >> relational databases. We have seen a number of cases where a user in the
> >> Rails UI has modified the field (foreign key), realized they made a
> >> mistake, and then updated the field again with a new key. The events are
> >> propagated out as they are produced, and as such we have had real-world
> >> cases where these inconsistencies were propagated downstream as the
> final
> >> values due to the race conditions in the fanout of the data.
> >>
> >> This solution that I propose values correctness of the final result over
> >> other factors.
> >>
> >> We could always move this function over to using a KScatteredTable
> >> implementation in the future, and simply deprecate it this join API in
> >> time. I think I would like to hear more from some of the other major
> >> committers on which course of action they would think is best before any
> >> more coding is done.
> >