Build failed in Jenkins: kafka-trunk-jdk14 #63

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR - Increase the number of Trogdor Histogram buckets to 1


--
[...truncated 3.08 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enable

Build failed in Jenkins: kafka-trunk-jdk11 #1432

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR - Increase the number of Trogdor Histogram buckets to 1


--
[...truncated 3.08 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldRetu

Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-07 Thread Cheng Tan
Hi Colin,

Sorry for the confusion. I’m proposing to implement timeout in the 
NetworkClient.leastLoadedNode() when iterating all the cached node. The 
alternative I can think is to implement the timeout in NetworkClient.poll() 

I’d prefer to implement in the leastLoadedNode(). Here’re the reasons:
Usually when clients send a request, they will asking the network client to 
send the request to a specific node. In this case, the connection.setup.timeout 
won’t matter too much because the client doesn’t want to try other nodes for 
that specific request. The request level timeout would be enough. The metadata 
fetcher fetches the nodes status periodically so the clients can reassign the 
request to another node after timeout.
Consumer, producer, and AdminClient are all using leastLoadedNode() for 
metadata fetch, where the connection setup timeout can play an important role. 
Unlike other requests can refer to the metadata for node condition, the 
metadata requests can only blindly choose a node for retry in the worst 
scenario. We want to make sure the client can get the metadata smoothly and as 
soon as possible. As a result, we need this connection.setup.timeout.
Implementing the timeout in poll() or anywhere else might need an extra 
iteration of all nodes, which might downgrade the network client performance.
I also updated the KIP content and KIP status. Please let me know if the above 
ideas make sense. Thanks.

Best, - Cheng Tan



> On May 4, 2020, at 5:26 PM, Colin McCabe  wrote:
> 
> Hi Cheng,
> 
> On the KIP page, it lists this KIP as "draft."  It seems like "under 
> discussion" is appropriate here, right?
> 
>> Currently, the initial socket connection timeout is depending on Linux 
>> kernel setting
>> tcp_syn_retries. The timeout value is 2 ^ (tcp_sync_retries + 1) - 1 
>> seconds. For the
>> reasons below, we want to control the client-side socket timeout directly 
>> using 
>> configuration files
> 
> Linux is just one example of an OS that Kafka could run on, right?  You could 
> also be running on MacOS, for example.
> 
>> I'm proposing to do a lazy socket connection time out. That is, we only 
>> check if
>> we need to timeout a socket when we consider the corresponding node as a 
>> candidate in the node provider.
> 
> The NodeProvider is an AdminClient abstraction, right?  Why wouldn't we 
> implement a connection setup timeout for all clients, not just AdminClient?
> 
> best,
> Colin
> 
> On Mon, May 4, 2020, at 13:18, Colin McCabe wrote:
>> Hmm.  A big part of the reason behind the KIP is that the default 
>> connection timeout behavior of the OS doesn't work for Kafka, right?  
>> For example, on Linux, if we wait 127 seconds for a connection attempt 
>> to time out, we won't get a chance to make another attempt in most 
>> cases.  So I think it makes sense to set a shorter default.
>> 
>> best,
>> Colin
>> 
>> 
>> On Mon, May 4, 2020, at 09:44, Jose Garcia Sancio wrote:
>>> Thanks for the KIP Cheng,
>>> 
 The default value will be 10 seconds.
>>> 
>>> I think we should make the default the current behavior. Meaning the
>>> default should leverage the default connect timeout from the operating
>>> system.
>>> 
 Proposed Changes
>>> 
>>> I don't fully understand this section. It seems like it is mainly
>>> focused on the problem with the current implementation. Can you
>>> explain how the proposed changes solve the problem?
>>> 
>>> Thanks.
>>> 
>>> 
>>> -- 
>>> -Jose
>>> 
>> 



[REVIEW REQUEST] KAFKA-3184: Add Checkpoint for In-memory State Store

2020-05-07 Thread Nikolay Izhikov
Hello, Kafka Team.

I prepared a PR [1] for the KAFKA-3184 [2]
Can someone, please, do the review.


[1] https://github.com/apache/kafka/pull/8592
[2] https://issues.apache.org/jira/browse/KAFKA-3184


Re: [DISCUSS] KIP-554: Add Broker-side SCRAM Config API

2020-05-07 Thread Rajini Sivaram
Hi Colin,

Thanks for the KIP. A couple of comments below:

1) SCRAM password is never sent over the wire today, not by clients, not by
tools. A salted-hashed version of it stored in ZooKeeper is sent over the
wire to ZK and read by brokers from ZK. Another randomly-salted-hashed
version is sent by clients during authentication. The transformation of the
password to salted version is performed by kafka-configs tool. I think we
should continue to do the same. We should still treat this credential as a
`password` config to ensure we don't log it anywhere. One of the biggest
advantages of SCRAM is that broker (or ZooKeeper) is never is possession of
the client password, it has the ability to verify the client password, but
not impersonate the user with that password. The proposed API breaks that
and hence we should perform transformation on the tool, not the broker.

2) The naming in the API seems a bit confusing. Scram mechanism is a thing
in SASL. So ScramMechanism would be SCRAM-SHA-256 or SCRAM-SHA-512. These
are standard names (but we use underscore instead of hyphen for the enums).
The underlying algorithms are internal and don't need to be in the public
API. We are using ScramMechanism in the new API to refer to a
ScramCredential. And ScramMechanismType to use strings that are not the
actual SCRAM mechanism. Perhaps these could just be `ScramMechanism` and
`ScramCredential` like they are currently in the Kafka codebase, but just
refactored to separate out internals from the public API?

Regards,

Rajini


On Thu, May 7, 2020 at 5:48 AM Colin McCabe  wrote:

> On Tue, May 5, 2020, at 08:12, Tom Bentley wrote:
> > Hi Colin,
> >
> > SCRAM is better than SASL PLAIN because it doesn't send the password over
> > the wire in the clear. Presumably this property is important for some
> users
> > who have chosen to use SCRAM. This proposal does send the password in the
> > clear when setting the password. That doesn't mean it can't be used
> > securely (e.g. connect over TLS–if available–when setting or changing a
> > password, or connect to the broker from the same machine over localhost),
> > but won't this just result in some CVE against Kafka? It's a tricky
> problem
> > to solve in a cluster without TLS (you basically just end up reinventing
> > TLS).
> >
>
> Hi Tom,
>
> Thanks for the thoughtful reply.
>
> If you don't set up SSL, we currently do send passwords in the clear over
> the wire.  There's just no other option-- as you yourself said, we're not
> going to reinvent TLS from first principles.  So this KIP isn't changing
> our policy about this.
>
> One example of this is if you have a zookeeper connection and it is not
> encrypted, your SCRAM password currently goes over the wire in the clear
> when you run the kafka-configs.sh command.  Another example is if you have
> one plaintext endpoint Kafka and one SSL Kafka endpoint, you can send the
> keystore password for the SSL endpoint in cleartext over the plaintext
> endpoint.
>
> >
> > I know you're not a few of the ever-growing list of configs, but when
> > I wrote KIP-506 I suggested some configs which could have been used to at
> > least make it secure by default.
> >
>
> I think if we want to add a configuration like that, it should be done in
> a separate KIP, because it affects more than just SCRAM.  We would also
> have to disallow setting any "sensitive" configuration over
> IncrementalAlterConfigs / AlterConfigs.
>
> Although I haven't thought about it that much, I doubt that such a KIP
> would be successful  Think about who still uses plaintext mode,.
> Developers use it for testing things locally.  They don't want additional
> restrictions on what they can do.  Sysadmins who are really convinced that
> their network is secure (I know, I know...) or who are setting up a
> proof-of-concept might use plaintext mode.  They don't want restrictions
> either.
>
> If the network is insecure and you're using plaintext, then we shouldn't
> allow you to send or receive messages either, since they could contain
> sensitive data.  So I think it's impossible to follow this logic very far
> before you arrive at plaintext delenda est.  And indeed, there have been
> people who have said we should remove the option to use plaintext mode from
> Kafka.  But so far, we're not ready to do that.
>
> >
> > You mentioned on the discussion for KIP-595 that there's a bootstrapping
> > problem to be solved in this area. Maybe KIP-595 is the better place for
> > that, but I wondered if you had any thoughts about it. I thought about
> > using a broker CLI option to read a password from stdin (`--scram-user
> tom`
> > would prompt for the password for user 'tom' on boot), that way the
> > password doesn't have to be on the command line arguments or in a file.
> In
> > fact this could be a solution to both the bootstrap problem and plaintext
> > password problem in the absence of TLS.
> >
>
> Yeah, I think this would be a good improvement.  The ability to read a
> p

Re: [DISCUSS] KIP-554: Add Broker-side SCRAM Config API

2020-05-07 Thread Jakub Scholz
Hi Colin,

Could you clarify how this fits with KIP-506 which seems to deal with the
same?

Thanks & Regards
Jakub

On Fri, May 1, 2020 at 8:18 AM Colin McCabe  wrote:

> Hi all,
>
> I posted a KIP about adding a new SCRAM configuration API on the broker.
> Check it out here if you get a chance:
> https://cwiki.apache.org/confluence/x/ihERCQ
>
> cheers,
> Colin
>


Build failed in Jenkins: kafka-2.4-jdk8 #197

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[konstantine] KAFKA-9419: Fix possible integer overflow in CircularIterator 
(#7950)

[konstantine] KAFKA-9768: Fix handling of rest.advertised.listener config 
(#8360)


--
[...truncated 7.62 MB...]
org.apache.kafka.streams.integration.ResetIntegrationWithSslTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationWithSslTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationWithSslTest > 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationWithSslTest > 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableT

Build failed in Jenkins: kafka-2.5-jdk8 #111

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[konstantine] KAFKA-9768: Fix handling of rest.advertised.listener config 
(#8360)


--
[...truncated 5.90 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] S

Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-07 Thread Tom Bentley
Hi Sönke,

Replies inline

1. The functionality in this first phase could indeed be achieved with
> custom serializers, that would then need to wrap the actual serializer that
> is to be used. However, looking forward I intend to add functionality that
> allows configuration to be configured broker-side via topic level configs
> and investigate encrypting entire batches of messages for performance. Both
> those things would require us to move past doing this in a serializer, so I
> think we should take that plunge now to avoid unnecessary refactoring later
> on.
>

I suspect you might have a hard time getting this KIP approved when the
immediate use cases it serves can already be implemented using custom
serialization.

Having a working implementation using custom serialization would:

* prove there's interest in these features amongst end users
* prove that there's interest in the specific features which would require
end-to-end encryption to be implemented in Kafka itself
* validate that the interfaces/abstractions in this proposal are the right
ones

All of those things would strengthen the argument for getting this into
Apache Kafka eventually.


> 2. Absolutely! I am currently working on a very (very) rough implementation
> to kind of prove the principle. I'll add those to the KIP as soon as I
> think they are in a somewhat final form.
> There are a lot of design details missing from the KIP, I didn't want to go
> all the way just for people to hate what I designed and have to start over
> ;)
>
> 3. Yes. I plan to create a LocalKeystoreKeyManager (name tbd) as part of
> this KIP that allows configuring keys per topic pattern and will read the
> keys from a local file. This will provide encryption, but users would have
> to manually sync keystores across consumer and producer systems. Proper key
> management with rollover and retrieval from central vaults would come in a
> later phase.
>

I think this is the hard part in many respects. Having a working
implementation for at least one key management system would presumably be a
prerequisite for getting this merged.

Even if this KIP got merged I think it's likely that there would be a
desire to limit the number of implementations of the interfaces within
Apache Kafka because of the maintenance and testing burden. (We've seen
this in other areas previously, ConfigProviders being one example.)

So again, this suggests to me that you might make more progress
implementing this outside Apache Kafka for the moment.

Having said all that, these are just my thoughts second guessing what the
community might do. I might be wrong.

Kind regards,

Tom


Kafka Connector

2020-05-07 Thread monisha kodi
Hi,

This is Monisha. I am a Netsuite Developer. I want to connect Netsuite and
Apache Kafka. It would be great if you provide me information about how to
connect Kafka and Netsuite.
Looking forward to your reply.

-- 
thank you.

Monisha Kodi
Phone: +49 15124770514


Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-07 Thread Michael André Pearce

Small typo correction i meant headers at the end of this paragraph not keys 
(sorry long week already)


corrected:


"
Second i would suggest we do not add additional section (again i would be a 
little -1 here) into the record specifically for this the whole point of 
headers being added, is additional bits such as this would levy on top of 
headers, e.g. the aes or other data that needs to transport with the record 
should be set into headers.
"

On 7 May 2020 at 8:47, Michael André Pearce  
wrote:


Hi 


I have just spotted this.


I would be a little -1 encrypting headers these are NOT safe to encrypt. The 
whole original reason for headers was for non-sensitive but transport or other 
meta information details, very akin to tcp headers, e.g. those also are not 
encrypted. These should remain un-encrypted so tools that are simply bridging 
messages between brokers/systems, can rely on headers for this, without needing 
to peek inside the business payload part (or decrypting it).


Second i would suggest we do not add additional section (again i would be a 
little -1 here) into the record specifically for this the whole point of 
headers being added, is additional bits such as this would levy on top of 
headers, e.g. the aes or other data that needs to transport with the record 
should be set into keys.


Please see both the original KIP-82 but more importantly the case and uses that 
they were added for.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers


Best
Mike



On 1 May 2020 at 23:18, Sönke Liebau  wrote:


Hi Tom,

thanks for taking a look!

Regarding your questions, I've answered below, but will also add more
detail to the KIP around these questions.

1. The functionality in this first phase could indeed be achieved with
custom serializers, that would then need to wrap the actual serializer that
is to be used. However, looking forward I intend to add functionality that
allows configuration to be configured broker-side via topic level configs
and investigate encrypting entire batches of messages for performance. Both
those things would require us to move past doing this in a serializer, so I
think we should take that plunge now to avoid unnecessary refactoring later
on.

2. Absolutely! I am currently working on a very (very) rough implementation
to kind of prove the principle. I'll add those to the KIP as soon as I
think they are in a somewhat final form.
There are a lot of design details missing from the KIP, I didn't want to go
all the way just for people to hate what I designed and have to start over
;)

3. Yes. I plan to create a LocalKeystoreKeyManager (name tbd) as part of
this KIP that allows configuring keys per topic pattern and will read the
keys from a local file. This will provide encryption, but users would have
to manually sync keystores across consumer and producer systems. Proper key
management with rollover and retrieval from central vaults would come in a
later phase.

4. I'm not 100% sure I follow your meaning here tbh. But I think the
question may be academic in this first instance, as compression happens at
batch level, so we can't encrypt at the record level after that. If we want
to stick with encrypting individual records, that would have to happen
pre-compression, unless I am mistaken about the internals here.

Best regards,
Sönke


On Fri, 1 May 2020 at 18:19, Tom Bentley  wrote:


Hi Sönke,


I never looked at the original version, but what you describe in the new
version makes sense to me.


Here are a few things which sprang to mind while I was reading:


1. It wasn't immediately obvious why this can't be achieved using custom
serializers and deserializers.
2. It would be useful to fully define the Java interfaces you're talking
about.
3 Would a KeyManager implementation be provided?
4. About compression+encryption: My understanding is CRIME used a chosen
plaintext attack. AFAICS using compression would potentially allow a known
plaintext attack, which is a weaker way of attacking a cipher. Even without
compression in the picture known plaintext attacks would be possible, for
example if the attacker knew the key was JSON encoded.


Kind regards,


Tom


On Wed, Apr 29, 2020 at 12:32 AM Sönke Liebau
 wrote:



All,

I've asked for comments on this KIP in the past, but since I didn't

really

get any feedback I've decided to reduce the initial scope of the KIP a

bit

and try again.

I have reworked to KIP to provide a limited, but useful set of features

for

this initial KIP and laid out a very rough roadmap of what I'd envision
this looking like in a final version.

I am aware that the KIP is currently light on implementation details, but
would like to get some feedback on the general approach before fully
speccing everything.

The KIP can be found at



https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+end-to-end+data+encryption+functionality+t

Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-07 Thread Michael André Pearce

Hi 


I have just spotted this.


I would be a little -1 encrypting headers these are NOT safe to encrypt. The 
whole original reason for headers was for non-sensitive but transport or other 
meta information details, very akin to tcp headers, e.g. those also are not 
encrypted. These should remain un-encrypted so tools that are simply bridging 
messages between brokers/systems, can rely on headers for this, without needing 
to peek inside the business payload part (or decrypting it).


Second i would suggest we do not add additional section (again i would be a 
little -1 here) into the record specifically for this the whole point of 
headers being added, is additional bits such as this would levy on top of 
headers, e.g. the aes or other data that needs to transport with the record 
should be set into keys.


Please see both the original KIP-82 but more importantly the case and uses that 
they were added for.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers


Best
Mike



On 1 May 2020 at 23:18, Sönke Liebau  wrote:


Hi Tom,

thanks for taking a look!

Regarding your questions, I've answered below, but will also add more
detail to the KIP around these questions.

1. The functionality in this first phase could indeed be achieved with
custom serializers, that would then need to wrap the actual serializer that
is to be used. However, looking forward I intend to add functionality that
allows configuration to be configured broker-side via topic level configs
and investigate encrypting entire batches of messages for performance. Both
those things would require us to move past doing this in a serializer, so I
think we should take that plunge now to avoid unnecessary refactoring later
on.

2. Absolutely! I am currently working on a very (very) rough implementation
to kind of prove the principle. I'll add those to the KIP as soon as I
think they are in a somewhat final form.
There are a lot of design details missing from the KIP, I didn't want to go
all the way just for people to hate what I designed and have to start over
;)

3. Yes. I plan to create a LocalKeystoreKeyManager (name tbd) as part of
this KIP that allows configuring keys per topic pattern and will read the
keys from a local file. This will provide encryption, but users would have
to manually sync keystores across consumer and producer systems. Proper key
management with rollover and retrieval from central vaults would come in a
later phase.

4. I'm not 100% sure I follow your meaning here tbh. But I think the
question may be academic in this first instance, as compression happens at
batch level, so we can't encrypt at the record level after that. If we want
to stick with encrypting individual records, that would have to happen
pre-compression, unless I am mistaken about the internals here.

Best regards,
Sönke


On Fri, 1 May 2020 at 18:19, Tom Bentley  wrote:


Hi Sönke,


I never looked at the original version, but what you describe in the new
version makes sense to me.


Here are a few things which sprang to mind while I was reading:


1. It wasn't immediately obvious why this can't be achieved using custom
serializers and deserializers.
2. It would be useful to fully define the Java interfaces you're talking
about.
3 Would a KeyManager implementation be provided?
4. About compression+encryption: My understanding is CRIME used a chosen
plaintext attack. AFAICS using compression would potentially allow a known
plaintext attack, which is a weaker way of attacking a cipher. Even without
compression in the picture known plaintext attacks would be possible, for
example if the attacker knew the key was JSON encoded.


Kind regards,


Tom


On Wed, Apr 29, 2020 at 12:32 AM Sönke Liebau
 wrote:



All,

I've asked for comments on this KIP in the past, but since I didn't

really

get any feedback I've decided to reduce the initial scope of the KIP a

bit

and try again.

I have reworked to KIP to provide a limited, but useful set of features

for

this initial KIP and laid out a very rough roadmap of what I'd envision
this looking like in a final version.

I am aware that the KIP is currently light on implementation details, but
would like to get some feedback on the general approach before fully
speccing everything.

The KIP can be found at



https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+end-to-end+data+encryption+functionality+to+Apache+Kafka



I would very much appreciate any feedback!

Best regards,
Sönke






--
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


LogManager.nextLogDir should consider total partition size not count

2020-05-07 Thread Igor Soarez


When running Kafka with multiple log directories 
kafka.log.LogManager.getOrCreateLog selects the first available log directory 
with the smallest number of topic partitions.
Topic partitions can have different sizes and this policy easily leads to data 
imbalances between log directories (or disks).

It isn't hard to change the policy (or add a configuration option to change it) 
so that the directory picked is the one with the smallest total size of logs 
i.e. the least used storage-wise. I have a patch and tests, what's the best way 
to go about this? Open a PR? Create a JIRA first? Create a KIP first?

Since the existing policy makes little sense IMO, should it be changed 
straightwaway or should we have an option to activate the correct behavior and 
keep the existing policy as default?

--
Igor Soarez



Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-07 Thread Ryanne Dolan
Thanks Sönke, this is an area in which Kafka is really, really far behind.

I've built secure systems around Kafka as laid out in the KIP. One issue
that is not addressed in the KIP is re-encryption of records after a key
rotation. When a key is compromised, it's important that any data encrypted
using that key is immediately destroyed or re-encrypted with a new key.
Ideally first-class support for end-to-end encryption in Kafka would make
this possible natively, or else I'm not sure what the point would be. It
seems to me that the brokers would need to be involved in this process, so
perhaps a client-first approach will be painting ourselves into a corner.
Not sure.

Another issue is whether materialized tables, e.g. in Kafka Streams, would
see unencrypted or encrypted records. If we implemented the KIP as written,
it would still result in a bunch of plain text data in RocksDB everywhere.
Again, I'm not sure what the point would be. Perhaps using custom serdes
would actually be a more holistic approach, since Kafka Streams etc could
leverage these as well.

Similarly, if the whole record is encrypted, it becomes impossible to do
joins, group bys etc, which just need the record key and maybe don't have
access to the encryption key. Maybe only record _values_ should be
encrypted, and maybe Kafka Streams could defer decryption until the actual
value is inspected. That way joins etc are possible without the encryption
key, and RocksDB would not need to decrypt values before materializing to
disk.

This is why I've implemented encryption on a per-field basis, not at the
record level, when addressing kafka security in the past. And I've had to
build external pipelines that purge, re-encrypt, and re-ingest records when
keys are compromised.

This KIP might be a step in the right direction, not sure. But I'm hesitant
to support the idea of end-to-end encryption without a plan to address the
myriad other problems.

That said, we need this badly and I hope something shakes out.

Ryanne

On Tue, Apr 28, 2020, 6:26 PM Sönke Liebau
 wrote:

> All,
>
> I've asked for comments on this KIP in the past, but since I didn't really
> get any feedback I've decided to reduce the initial scope of the KIP a bit
> and try again.
>
> I have reworked to KIP to provide a limited, but useful set of features for
> this initial KIP and laid out a very rough roadmap of what I'd envision
> this looking like in a final version.
>
> I am aware that the KIP is currently light on implementation details, but
> would like to get some feedback on the general approach before fully
> speccing everything.
>
> The KIP can be found at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+end-to-end+data+encryption+functionality+to+Apache+Kafka
>
>
> I would very much appreciate any feedback!
>
> Best regards,
> Sönke
>


Re: [DISCUSS] KIP-595: A Raft Protocol for the Metadata Quorum

2020-05-07 Thread Jason Gustafson
Hi Jun,

Thanks for the comments. Here are a few quick responses.

> My question was on the auto generated broker id. Currently, the broker can
choose to have its broker Id auto generated. The generation is done through
ZK to guarantee uniqueness. Without ZK, it's not clear how the broker id is
auto generated. "quorum.voters" also can't be set statically if broker ids
are auto generated.

Yeah, I'm not sure I see a way for this to be supported during the initial
bootstrapping of the cluster. I had hoped we could leave it for a follow-up
proposal, but I think we need to have a section on broker registration
since there are some details to work out. Auto-generation of ids is one of
them. Another is the mechanism for registering broker state and how we
track liveness.

> Hmm, it's kind of weird to bump up the leader epoch before the new leader
is actually elected, right.

That is what the Raft protocol specifies. We bump the epoch upon becoming a
candidate. Once the epoch is bumped, we cannot go back to a previous epoch,
so there is no value remembering it. Perhaps it would be less confusing if
we used "epoch" instead of "leader epoch" consistently.

> Hmm, I thought only the quorum nodes have local logs and observers don't?

Observers also replicate and store the log.

Thanks,
Jason

On Mon, May 4, 2020 at 11:58 AM Jun Rao  wrote:

> Hi, Guozhang,
>
> Thanks for the reply. A few more replies inlined below.
>
> On Sun, May 3, 2020 at 6:33 PM Guozhang Wang  wrote:
>
> > Hello Jun,
> >
> > Thanks for your comments! I'm replying inline below:
> >
> > On Fri, May 1, 2020 at 12:36 PM Jun Rao  wrote:
> >
> > > 101. Bootstrapping related issues.
> > > 101.1 Currently, we support auto broker id generation. Is this
> supported
> > > for bootstrap brokers?
> > >
> >
> > The vote ids would just be the broker ids. "bootstrap.servers" would be
> > similar to what client configs have today, where "quorum.voters" would be
> > pre-defined config values.
> >
> >
> My question was on the auto generated broker id. Currently, the broker can
> choose to have its broker Id auto generated. The generation is done through
> ZK to guarantee uniqueness. Without ZK, it's not clear how the broker id is
> auto generated. "quorum.voters" also can't be set statically if broker ids
> are auto generated.
>
>
> > > 102. Log compaction. One weak spot of log compaction is for the
> consumer
> > to
> > > deal with deletes. When a key is deleted, it's retained as a tombstone
> > > first and then physically removed. If a client misses the tombstone
> > > (because it's physically removed), it may not be able to update its
> > > metadata properly. The way we solve this in Kafka is based on a
> > > configuration (log.cleaner.delete.retention.ms) and we expect a
> consumer
> > > having seen an old key to finish reading the deletion tombstone within
> > that
> > > time. There is no strong guarantee for that since a broker could be
> down
> > > for a long time. It would be better if we can have a more reliable way
> of
> > > dealing with deletes.
> > >
> >
> > We propose to capture this in the "FirstDirtyOffset" field of the quorum
> > record fetch response: the offset is the maximum offset that log
> compaction
> > has reached up to. If the follower has fetched beyond this offset it
> means
> > itself is safe hence it has seen all records up to that offset. On
> getting
> > the response, the follower can then decide if its end offset actually
> below
> > that dirty offset (and hence may miss some tombstones). If that's the
> case:
> >
> > 1) Naively, it could re-bootstrap metadata log from the very beginning to
> > catch up.
> > 2) During that time, it would refrain itself from answering
> MetadataRequest
> > from any clients.
> >
> >
> I am not sure that the "FirstDirtyOffset" field fully addresses the issue.
> Currently, the deletion tombstone is not removed immediately after a round
> of cleaning. It's removed after a delay in a subsequent round of cleaning.
> Consider an example where a key insertion is at offset 200 and a deletion
> tombstone of the key is at 400. Initially, FirstDirtyOffset is at 300. A
> follower/observer fetches from offset 0  and fetches the key at offset 200.
> A few rounds of cleaning happen. FirstDirtyOffset is at 500 and the
> tombstone at 400 is physically removed. The follower/observer continues the
> fetch, but misses offset 400. It catches all the way to FirstDirtyOffset
> and declares its metadata as ready. However, its metadata could be stale
> since it actually misses the deletion of the key.
>
>
> > > 105. Quorum State: In addition to VotedId, do we need the epoch
> > > corresponding to VotedId? Over time, the same broker Id could be voted
> in
> > > different generations with different epoch.
> > >
> > >
> > Hmm, this is a good point. Originally I think the "LeaderEpoch" field in
> > that file is corresponding to the "latest known leader epoch", not the
> > "current leader epoch". For example, if the current epoch 

Build failed in Jenkins: kafka-trunk-jdk8 #4510

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9865: Expose output topic names from TopologyTestDriver (#8483)


--
[...truncated 3.06 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldGetInternalTopicNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldGetInternalTopicNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = fa

Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-07 Thread Tom Bentley
Hi Rayanne,

You raise some good points there.

Similarly, if the whole record is encrypted, it becomes impossible to do
> joins, group bys etc, which just need the record key and maybe don't have
> access to the encryption key. Maybe only record _values_ should be
> encrypted, and maybe Kafka Streams could defer decryption until the actual
> value is inspected. That way joins etc are possible without the encryption
> key, and RocksDB would not need to decrypt values before materializing to
> disk.
>

It's getting a bit late here, so maybe I overlooked something, but wouldn't
the natural thing to do be to make the "encrypted" key a hash of the
original key, and let the value of the encrypted value be the cipher text
of the (original key, original value) pair. A scheme like this would
preserve equality of the key (strictly speaking there's a chance of
collision of course). I guess this could also be a solution for the
compacted topic issue Sönke mentioned.

Cheers,

Tom



On Thu, May 7, 2020 at 5:17 PM Ryanne Dolan  wrote:

> Thanks Sönke, this is an area in which Kafka is really, really far behind.
>
> I've built secure systems around Kafka as laid out in the KIP. One issue
> that is not addressed in the KIP is re-encryption of records after a key
> rotation. When a key is compromised, it's important that any data encrypted
> using that key is immediately destroyed or re-encrypted with a new key.
> Ideally first-class support for end-to-end encryption in Kafka would make
> this possible natively, or else I'm not sure what the point would be. It
> seems to me that the brokers would need to be involved in this process, so
> perhaps a client-first approach will be painting ourselves into a corner.
> Not sure.
>
> Another issue is whether materialized tables, e.g. in Kafka Streams, would
> see unencrypted or encrypted records. If we implemented the KIP as written,
> it would still result in a bunch of plain text data in RocksDB everywhere.
> Again, I'm not sure what the point would be. Perhaps using custom serdes
> would actually be a more holistic approach, since Kafka Streams etc could
> leverage these as well.
>
> Similarly, if the whole record is encrypted, it becomes impossible to do
> joins, group bys etc, which just need the record key and maybe don't have
> access to the encryption key. Maybe only record _values_ should be
> encrypted, and maybe Kafka Streams could defer decryption until the actual
> value is inspected. That way joins etc are possible without the encryption
> key, and RocksDB would not need to decrypt values before materializing to
> disk.
>
> This is why I've implemented encryption on a per-field basis, not at the
> record level, when addressing kafka security in the past. And I've had to
> build external pipelines that purge, re-encrypt, and re-ingest records when
> keys are compromised.
>
> This KIP might be a step in the right direction, not sure. But I'm hesitant
> to support the idea of end-to-end encryption without a plan to address the
> myriad other problems.
>
> That said, we need this badly and I hope something shakes out.
>
> Ryanne
>
> On Tue, Apr 28, 2020, 6:26 PM Sönke Liebau
>  wrote:
>
> > All,
> >
> > I've asked for comments on this KIP in the past, but since I didn't
> really
> > get any feedback I've decided to reduce the initial scope of the KIP a
> bit
> > and try again.
> >
> > I have reworked to KIP to provide a limited, but useful set of features
> for
> > this initial KIP and laid out a very rough roadmap of what I'd envision
> > this looking like in a final version.
> >
> > I am aware that the KIP is currently light on implementation details, but
> > would like to get some feedback on the general approach before fully
> > speccing everything.
> >
> > The KIP can be found at
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+end-to-end+data+encryption+functionality+to+Apache+Kafka
> >
> >
> > I would very much appreciate any feedback!
> >
> > Best regards,
> > Sönke
> >
>


[jira] [Created] (KAFKA-9968) Newly subscribed topic not present in metadata request

2020-05-07 Thread Man Hin (Jira)
Man Hin created KAFKA-9968:
--

 Summary: Newly subscribed topic not present in metadata request
 Key: KAFKA-9968
 URL: https://issues.apache.org/jira/browse/KAFKA-9968
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.4.1, 2.5.0
Reporter: Man Hin


Our application subscribes to multiple topics one by one. It uses to work fine. 
But after we have upgraded our Kafka client version from 2.4.0 and 2.4.1, our 
application failed to receive messages for the last topic any more.

I spotted a warning log from Kafka client.
{code:java}
2020-05-08 01:01:11.059 [main] [WARN ] o.a.k.c.c.i.ConsumerCoordinator - 
[Consumer clientId=sample_consumer, groupId=sample_client] The following 
subscribed topics are not assigned to any members: [TopicX]  {code}
I'm able to reproduce it with a test case running against a live Kafka broker 
(we are using v2.4.1 broker).
{code:java}
@Test
public void WHEN_subscribed_sequentially_THEN_receive_assignment() throws 
InterruptedException, ExecutionException, TimeoutException {
// WHEN
List topics = new ArrayList<>();

topics.add(TOPIC_C);
consumer.subscribe(topics);
consumer.poll(0);

topics.add(TOPIC_B);
consumer.subscribe(topics);
consumer.poll(0);

topics.add(TOPIC_A);
consumer.subscribe(topics);
consumer.poll(0);

// THEN
Set assignments = consumer.assignment();
Set topicSet = assignments.stream().map(p -> 
p.topic()).distinct().collect(Collectors.toSet());
logger.info("Topic: {}", topicSet);
assertThat(topicSet, hasItems(TOPIC_C, TOPIC_B, TOPIC_A));
}{code}
We turned on trace log and found that the metadata requests always missed the 
last topic we subscribed.
{code:java}
2020-05-08 01:01:10.665 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): TopicC
2020-05-08 01:01:10.983 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
clientId=sample_consumer, groupId=sample_client] Sending metadata request 
MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], 
allowAutoTopicCreation=true, includeClusterA
2020-05-08 01:01:11.003 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
[Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
(JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
rebalanceTimeoutMs=30, memberId=
2020-05-08 01:01:11.015 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
[Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
(JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
rebalanceTimeoutMs=30, memberId=
2020-05-08 01:01:11.049 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
TopicC, TopicB
2020-05-08 01:01:11.053 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
clientId=sample_consumer, groupId=sample_client] Sending metadata request 
MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], 
allowAutoTopicCreation=true, includeClusterA
2020-05-08 01:01:11.057 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
[Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
(JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
rebalanceTimeoutMs=30, memberId=
2020-05-08 01:01:11.062 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
TopicC, TopicB, TopicA
2020-05-08 01:01:11.062 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
clientId=sample_consumer, groupId=sample_client] Sending metadata request 
MetadataRequestData(topics=[MetadataRequestTopic(name='TopicB'), 
MetadataRequestTopic(name='TopicC')], allowAu
2020-05-08 01:01:11.064 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
[Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
(JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
rebalanceTimeoutMs=30, memberId=
{code}
I suspect this is because SubscriptionState.groupSubscription contains only 
topics as returned by joinGroup response since 2.4.1. As such the newly 
subscribed topic is missed out from the metadata request.

The behaviour before 2.4.0 was to add topics in joinGroup response to 
groupSubscription but changed to replace in 2.4.1. Maybe this is the cause. See 
SubscriptionState in 
[https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c.|https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c]

I tried client v2.5.0 and got the same result.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-07 Thread Tom Bentley
Hi again,

Of course I was overlooking at least one thing. Anyone who could guess the
record keys could hash them and compare. To make it work the producer and
consumer would need a shared secret to include in the hash computation. But
the key management service could furnish them with this in addition to the
encryption/decryption keys, so I think it should still work. Unless I've
overlooked something else.

Cheers,

Tom

On Thu, May 7, 2020 at 6:04 PM Tom Bentley  wrote:

> Hi Rayanne,
>
> You raise some good points there.
>
> Similarly, if the whole record is encrypted, it becomes impossible to do
>> joins, group bys etc, which just need the record key and maybe don't have
>> access to the encryption key. Maybe only record _values_ should be
>> encrypted, and maybe Kafka Streams could defer decryption until the actual
>> value is inspected. That way joins etc are possible without the encryption
>> key, and RocksDB would not need to decrypt values before materializing to
>> disk.
>>
>
> It's getting a bit late here, so maybe I overlooked something, but
> wouldn't the natural thing to do be to make the "encrypted" key a hash of
> the original key, and let the value of the encrypted value be the cipher
> text of the (original key, original value) pair. A scheme like this would
> preserve equality of the key (strictly speaking there's a chance of
> collision of course). I guess this could also be a solution for the
> compacted topic issue Sönke mentioned.
>
> Cheers,
>
> Tom
>
>
>
> On Thu, May 7, 2020 at 5:17 PM Ryanne Dolan  wrote:
>
>> Thanks Sönke, this is an area in which Kafka is really, really far behind.
>>
>> I've built secure systems around Kafka as laid out in the KIP. One issue
>> that is not addressed in the KIP is re-encryption of records after a key
>> rotation. When a key is compromised, it's important that any data
>> encrypted
>> using that key is immediately destroyed or re-encrypted with a new key.
>> Ideally first-class support for end-to-end encryption in Kafka would make
>> this possible natively, or else I'm not sure what the point would be. It
>> seems to me that the brokers would need to be involved in this process, so
>> perhaps a client-first approach will be painting ourselves into a corner.
>> Not sure.
>>
>> Another issue is whether materialized tables, e.g. in Kafka Streams, would
>> see unencrypted or encrypted records. If we implemented the KIP as
>> written,
>> it would still result in a bunch of plain text data in RocksDB everywhere.
>> Again, I'm not sure what the point would be. Perhaps using custom serdes
>> would actually be a more holistic approach, since Kafka Streams etc could
>> leverage these as well.
>>
>> Similarly, if the whole record is encrypted, it becomes impossible to do
>> joins, group bys etc, which just need the record key and maybe don't have
>> access to the encryption key. Maybe only record _values_ should be
>> encrypted, and maybe Kafka Streams could defer decryption until the actual
>> value is inspected. That way joins etc are possible without the encryption
>> key, and RocksDB would not need to decrypt values before materializing to
>> disk.
>>
>> This is why I've implemented encryption on a per-field basis, not at the
>> record level, when addressing kafka security in the past. And I've had to
>> build external pipelines that purge, re-encrypt, and re-ingest records
>> when
>> keys are compromised.
>>
>> This KIP might be a step in the right direction, not sure. But I'm
>> hesitant
>> to support the idea of end-to-end encryption without a plan to address the
>> myriad other problems.
>>
>> That said, we need this badly and I hope something shakes out.
>>
>> Ryanne
>>
>> On Tue, Apr 28, 2020, 6:26 PM Sönke Liebau
>>  wrote:
>>
>> > All,
>> >
>> > I've asked for comments on this KIP in the past, but since I didn't
>> really
>> > get any feedback I've decided to reduce the initial scope of the KIP a
>> bit
>> > and try again.
>> >
>> > I have reworked to KIP to provide a limited, but useful set of features
>> for
>> > this initial KIP and laid out a very rough roadmap of what I'd envision
>> > this looking like in a final version.
>> >
>> > I am aware that the KIP is currently light on implementation details,
>> but
>> > would like to get some feedback on the general approach before fully
>> > speccing everything.
>> >
>> > The KIP can be found at
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+end-to-end+data+encryption+functionality+to+Apache+Kafka
>> >
>> >
>> > I would very much appreciate any feedback!
>> >
>> > Best regards,
>> > Sönke
>> >
>>
>


Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-07 Thread Ryanne Dolan
Tom, good point, I've done exactly that -- hashing record keys -- but it's
unclear to me what should happen when the hash key must be rotated. In my
case the (external) solution involved rainbow tables, versioned keys, and
custom materializers that were aware of older keys for each record.

In particular I had a pipeline that would re-key records and re-ingest
them, while opportunistically overwriting records materialized with the old
key.

For a native solution I think maybe we'd need to carry around any old
versions of each record key, perhaps as metadata. Then brokers and
materializers can compact records based on _any_ overlapping key, maybe?
Not sure.

Ryanne

On Thu, May 7, 2020, 12:05 PM Tom Bentley  wrote:

> Hi Rayanne,
>
> You raise some good points there.
>
> Similarly, if the whole record is encrypted, it becomes impossible to do
> > joins, group bys etc, which just need the record key and maybe don't have
> > access to the encryption key. Maybe only record _values_ should be
> > encrypted, and maybe Kafka Streams could defer decryption until the
> actual
> > value is inspected. That way joins etc are possible without the
> encryption
> > key, and RocksDB would not need to decrypt values before materializing to
> > disk.
> >
>
> It's getting a bit late here, so maybe I overlooked something, but wouldn't
> the natural thing to do be to make the "encrypted" key a hash of the
> original key, and let the value of the encrypted value be the cipher text
> of the (original key, original value) pair. A scheme like this would
> preserve equality of the key (strictly speaking there's a chance of
> collision of course). I guess this could also be a solution for the
> compacted topic issue Sönke mentioned.
>
> Cheers,
>
> Tom
>
>
>
> On Thu, May 7, 2020 at 5:17 PM Ryanne Dolan  wrote:
>
> > Thanks Sönke, this is an area in which Kafka is really, really far
> behind.
> >
> > I've built secure systems around Kafka as laid out in the KIP. One issue
> > that is not addressed in the KIP is re-encryption of records after a key
> > rotation. When a key is compromised, it's important that any data
> encrypted
> > using that key is immediately destroyed or re-encrypted with a new key.
> > Ideally first-class support for end-to-end encryption in Kafka would make
> > this possible natively, or else I'm not sure what the point would be. It
> > seems to me that the brokers would need to be involved in this process,
> so
> > perhaps a client-first approach will be painting ourselves into a corner.
> > Not sure.
> >
> > Another issue is whether materialized tables, e.g. in Kafka Streams,
> would
> > see unencrypted or encrypted records. If we implemented the KIP as
> written,
> > it would still result in a bunch of plain text data in RocksDB
> everywhere.
> > Again, I'm not sure what the point would be. Perhaps using custom serdes
> > would actually be a more holistic approach, since Kafka Streams etc could
> > leverage these as well.
> >
> > Similarly, if the whole record is encrypted, it becomes impossible to do
> > joins, group bys etc, which just need the record key and maybe don't have
> > access to the encryption key. Maybe only record _values_ should be
> > encrypted, and maybe Kafka Streams could defer decryption until the
> actual
> > value is inspected. That way joins etc are possible without the
> encryption
> > key, and RocksDB would not need to decrypt values before materializing to
> > disk.
> >
> > This is why I've implemented encryption on a per-field basis, not at the
> > record level, when addressing kafka security in the past. And I've had to
> > build external pipelines that purge, re-encrypt, and re-ingest records
> when
> > keys are compromised.
> >
> > This KIP might be a step in the right direction, not sure. But I'm
> hesitant
> > to support the idea of end-to-end encryption without a plan to address
> the
> > myriad other problems.
> >
> > That said, we need this badly and I hope something shakes out.
> >
> > Ryanne
> >
> > On Tue, Apr 28, 2020, 6:26 PM Sönke Liebau
> >  wrote:
> >
> > > All,
> > >
> > > I've asked for comments on this KIP in the past, but since I didn't
> > really
> > > get any feedback I've decided to reduce the initial scope of the KIP a
> > bit
> > > and try again.
> > >
> > > I have reworked to KIP to provide a limited, but useful set of features
> > for
> > > this initial KIP and laid out a very rough roadmap of what I'd envision
> > > this looking like in a final version.
> > >
> > > I am aware that the KIP is currently light on implementation details,
> but
> > > would like to get some feedback on the general approach before fully
> > > speccing everything.
> > >
> > > The KIP can be found at
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+end-to-end+data+encryption+functionality+to+Apache+Kafka
> > >
> > >
> > > I would very much appreciate any feedback!
> > >
> > > Best regards,
> > > Sönke
> > >
> >
>


[VOTE] KIP-605 - Expand Connect Worker Internal Topic Settings

2020-05-07 Thread Randall Hauch
I'd like to open the vote for KIP-605:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Expand+Connect+Worker+Internal+Topic+Settings

This is relatively straightforward, and the discussion thread had just a
few suggestions that have already been incorporated into the KIP.

Best regards,

Randall


[DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-07 Thread Aakash Shah
Hello all,

I've created a KIP to handle error reporting for records in sink
connectors, specifically within the context of put(...):

https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors

I would appreciate any kind of feedback.

Thanks,

Aakash


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-07 Thread Andrew Schofield
Hi,
Thanks for the KIP.

I wonder whether this idea would be better implemented using a new method on the
SinkTaskContext.

  public void putFailed(Collection records)

Then the rest of KIP-298 could apply. Failed records could be put to the DLQ or 
logged, as
appropriate. I think there's real value in keeping the rest of the error 
handling intact and using
the same mechanism for failed puts as for failures in the other stages of 
processing. If you want
to retry a batch, throw a RetriableException. If the record cannot be processed 
perhaps due to
a size limit in the target system, flag it as failed and the framework can 
invoke its error handling
code on the offending record or records. The only stipulation is that the error 
action needs to be
completed before the offset for a failed record is committed or it might be 
lost.

Thanks,
Andrew

On 07/05/2020, 19:09, "Aakash Shah"  wrote:

Hello all,

I've created a KIP to handle error reporting for records in sink
connectors, specifically within the context of put(...):


https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors

I would appreciate any kind of feedback.

Thanks,

Aakash



Build failed in Jenkins: kafka-trunk-jdk11 #1433

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9865: Expose output topic names from TopologyTestDriver (#8483)


--
[...truncated 6.16 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldGetSinkTopicNames[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldGetSinkTopicNames[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThro

Build failed in Jenkins: kafka-trunk-jdk14 #64

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9865: Expose output topic names from TopologyTestDriver (#8483)


--
[...truncated 6.16 MB...]
org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
STARTED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
PASSED

org.apache.kafka.streams.TestTopicsTest > testDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testValue STARTED

org.apache.kafka.streams.TestTopicsTest > testValue PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName PASSED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics STARTED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver PASSED

org.apache.kafka.streams.TestTopicsTest > testValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testValueList PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordList PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testInputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testInputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders STARTED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValue STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValue PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

> Task :streams:upgrade-system-tests-0110:processT

[jira] [Created] (KAFKA-9969) ConnectorClientConfigRequest is loaded in isolation and throws LinkageError

2020-05-07 Thread Greg Harris (Jira)
Greg Harris created KAFKA-9969:
--

 Summary: ConnectorClientConfigRequest is loaded in isolation and 
throws LinkageError
 Key: KAFKA-9969
 URL: https://issues.apache.org/jira/browse/KAFKA-9969
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.4.1, 2.5.0
Reporter: Greg Harris
Assignee: Greg Harris


ConnectorClientConfigRequest (added by 
[KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy])
 is a class in connect-api, and should always be loaded by the system 
classloader. If a plugin packages the connect-api jar, the REST API may fail 
with the following stacktrace:

{noformat}
java.lang.LinkageError: loader constraint violation: loader (instance of 
sun/misc/Launcher$AppClassLoader) previously initiated loading for a different 
type with name 
"org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest" at 
java.lang.ClassLoader.defineClass1(Native Method) at 
java.lang.ClassLoader.defineClass(ClassLoader.java:763) at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at 
java.net.URLClassLoader.access$100(URLClassLoader.java:74) at 
java.net.URLClassLoader$1.run(URLClassLoader.java:369) at 
java.net.URLClassLoader$1.run(URLClassLoader.java:363) at 
java.security.AccessController.doPrivileged(Native Method) at 
java.net.URLClassLoader.findClass(URLClassLoader.java:362) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424) at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357) at 
org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:416)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
... 1 more
{noformat}

It appears that the other class in org.apache.kafka.connect.connector.policy, 
ConnectorClientConfigOverridePolicy had a similar issue in KAFKA-8415, and 
received a fix.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9970) Kafka 2.4.0 crashes under Windows when client is restarted

2020-05-07 Thread SledgeHammer (Jira)
SledgeHammer created KAFKA-9970:
---

 Summary: Kafka 2.4.0 crashes under Windows when client is restarted
 Key: KAFKA-9970
 URL: https://issues.apache.org/jira/browse/KAFKA-9970
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.4.0
Reporter: SledgeHammer


Windows 10 x64 Pro

JDK 11.0.7

Zookeeper 3.6.0

Kafka 2.12-2.4.0

 

I have reproduced this scenario on multiple machines. I do dev on my Windows 
box, so have ZK and K running locally. On my work PC, I'll leave ZK & K running 
in command windows for "ever" at home I'll shutdown when I'm not doing dev.

In either case, Spring Boot client is continuously started and restarted. 
Intermittently Kafka will crash and corrupt the logs (I'm not able to capture 
the K crash exception since it closes), but upon restart, I get the exception 
below. NOTE: file is not in use since I can delete the logs directory and then 
restart. Client is both streams and classic queues.

 

[2020-05-07 13:38:27,782] ERROR Failed to clean up log for 
__consumer_offsets-20 in dir C:\PROGRA~1\kafka_2.12-2.4.0\logs due to 
IOException (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
C:\PROGRA~1\kafka_2.12-2.4.0\logs\__consumer_offsets-20\.timeindex.cleaned
 -> 
C:\PROGRA~1\kafka_2.12-2.4.0\logs\__consumer_offsets-20\.timeindex.swap:
 The process cannot access the file because it is being used by another process.

at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
 at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
 at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
 at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
 at java.base/java.nio.file.Files.move(Files.java:1421)
 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
 at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
 at kafka.log.Log.$anonfun$replaceSegments$4(Log.scala:2267)
 at kafka.log.Log.$anonfun$replaceSegments$4$adapted(Log.scala:2267)
 at scala.collection.immutable.List.foreach(List.scala:392)
 at kafka.log.Log.replaceSegments(Log.scala:2267)
 at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:604)
 at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:529)
 at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:528)
 at scala.collection.immutable.List.foreach(List.scala:392)
 at kafka.log.Cleaner.doClean(LogCleaner.scala:528)
 at kafka.log.Cleaner.clean(LogCleaner.scala:502)
 at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371)
 at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344)
 at 
kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk8 #4511

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Log4j Improvements on Fetcher (#8629)


--
[...truncated 3.06 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldGetInternalTopicNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldGetInternalTopicNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.t

Re: [DISCUSS] Kafka 3.0

2020-05-07 Thread Colin McCabe
On Wed, May 6, 2020, at 21:40, Ryanne Dolan wrote:
> > This will allow us to get an "alpha" version of the KIP-500 mode out
> > early for people to experiment with
> 
> I think this is a non-sequitur. It's not a requirement that KIP-500 be
> merged to master and released in order for people to experiment with it.
>

Hi Ryanne,

I agree that it is not a requirement that KIP-500 be merged to master in order 
for people to experiment with it.  The main reason for merging to master is to 
avoid maintaining lots of branches and doing lots of backports.

>
> Nor does it sound great to call for a major release (3.0) in order to get
> an "alpha version ... out early".
> 

As I said earlier, the reason for the new major release is to make certain 
incompatible changes, not in order to get an alpha version of KIP-500 out.  For 
example, dropping the zookeeper flags is a step forward for security and 
encapsulation which also advances KIP-500.  Another example is that removing 
the kafka-preferred-replica-election.sh command removes a duplicate command 
that has been deprecated for a while.

best,
Colin

> 
> On Wed, May 6, 2020 at 10:52 PM Colin McCabe  wrote:
> 
> > On Mon, May 4, 2020, at 17:12, Ryanne Dolan wrote:
> > > Hey Colin, I think we should wait until after KIP-500's "bridge
> > > release" so there is a clean break from Zookeeper after 3.0. The
> > > bridge release by definition is an attempt to not break anything, so
> > > it theoretically doesn't warrant a major release.
> >
> > Hi Ryanne,
> >
> > I think it's important to clarify this a little bit.  The bridge release
> > (really, releases, plural) allow you to upgrade from a cluster that is
> > using ZooKeeper to one that is not using ZooKeeper.  But, that doesn't
> > imply that the bridge release itself doesn't break anything.  Upgrading
> > to the bridge release itself might involve some minor incompatibility.
> >
> > Kafka does occasionally have incompatible changes.  In those cases, we
> > bump the major version number.  One example is that when we went from
> > Kafka 1.x to Kafka 2.0, we dropped support for JDK7.  This is an
> > incompatible change.
> >
> > In fact, we know that the bridge release will involve at least one
> > incompatible change.  We will need to drop support for the --zookeeper
> > flags in the command-line tools.
> >
> > We've been preparing for this change for a long time.  People have spent
> > a lot of effort designing new APIs that can be used instead of the old
> > zookeeper-based code that some of the command-line tools used.  We have
> > also deprecated the old ZK-based flags.  But at the end of the day, it
> > is still an incompatible change.  So it's unfortunately not possible for
> > the
> > bridge release to be a 2.x release.
> >
> > > If that's not the case (i.e. if a single "bridge release" turns out to
> > > be impractical), we should consider forking 3.0 while maintaining a
> > > line of Zookeeper-dependent Kafka in 2.x. That way 3.x can evolve
> > > dramatically without breaking the 2.x line. In particular, anything
> > > related to removing Zookeeper could land in pre-3.0 while every other
> > > feature targets 2.6.
> >
> > Just to be super clear about this, what we want to do here is support
> > operating in __either__ KIP-500 mode and legacy mode for a while.  So the
> > same branch will have support for both the old way and the new way of
> > managing metadata.
> >
> > This will allow us to get an "alpha" version of the KIP-500 mode out early
> > for people to experiment with.  It also greatly reduces the number of Kafka
> > releases we have to make, and the amount of backporting we have to do.
> >
> > >
> > > If you are proposing 2.6 should be the "bridge release", I think this
> > > is premature given Kafka's time-based release schedule. If the bridge
> > > features happen to be merged before 2.6's feature freeze, then sure --
> > > let's make that the bridge release in retrospect. And if we get all
> > > the post-Zookeeper features merged before 2.7, I'm onboard with naming
> > > it "3.0" instead.
> > >
> > > That said, we should aim to remove legacy MirrorMaker before 3.0 as
> > > well. I'm happy to drive that additional breaking change. Maybe 2.6
> > > can be the "bridge" for MM2 as well.
> >
> > I don't have a strong opinion either way about this, but if we want to
> > remove the original MirrorMaker, we have to deprecate it first, right?  Are
> > we ready to do that?
> >
> > best,
> > Colin
> >
> > >
> > > Ryanne
> > >
> > > On Mon, May 4, 2020, 5:05 PM Colin McCabe  wrote:
> > >
> > > > Hi all,
> > > >
> > > > We've had a few proposals recently for incompatible changes.  One of
> > > > them is my KIP-604: Remove ZooKeeper Flags from the Administrative
> > > > Tools.  The other is Boyang's KIP-590: Redirect ZK Mutation
> > > > Protocols to the Controller.  I think it's time to start thinking
> > > > about Kafka 3.0. Specifically, I think we should move to 3.0 after
> > > > the 2.6 release.
> > > >
> 

Re: [VOTE] KIP-605 - Expand Connect Worker Internal Topic Settings

2020-05-07 Thread Christopher Egerton
LGTM, +1 (non-binding)

Thanks Randall!

On Thu, May 7, 2020 at 11:01 AM Randall Hauch  wrote:

> I'd like to open the vote for KIP-605:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Expand+Connect+Worker+Internal+Topic+Settings
>
> This is relatively straightforward, and the discussion thread had just a
> few suggestions that have already been incorporated into the KIP.
>
> Best regards,
>
> Randall
>


Build failed in Jenkins: kafka-trunk-jdk11 #1434

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Log4j Improvements on Fetcher (#8629)


--
[...truncated 3.08 MB...]

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

org.apache

Build failed in Jenkins: kafka-trunk-jdk14 #65

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Log4j Improvements on Fetcher (#8629)


--
[...truncated 3.08 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > shouldGetSinkTopicNames[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] ST

Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-07 Thread Cheng Tan
I think more about the potential wider use cases, I modified the proposal to 
target all the connection. Thanks.

- Best, - Cheng Tan

> On May 7, 2020, at 1:41 AM, Cheng Tan  wrote:
> 
> Hi Colin,
> 
> Sorry for the confusion. I’m proposing to implement timeout in the 
> NetworkClient.leastLoadedNode() when iterating all the cached node. The 
> alternative I can think is to implement the timeout in NetworkClient.poll() 
> 
> I’d prefer to implement in the leastLoadedNode(). Here’re the reasons:
> Usually when clients send a request, they will asking the network client to 
> send the request to a specific node. In this case, the 
> connection.setup.timeout won’t matter too much because the client doesn’t 
> want to try other nodes for that specific request. The request level timeout 
> would be enough. The metadata fetcher fetches the nodes status periodically 
> so the clients can reassign the request to another node after timeout.
> Consumer, producer, and AdminClient are all using leastLoadedNode() for 
> metadata fetch, where the connection setup timeout can play an important 
> role. Unlike other requests can refer to the metadata for node condition, the 
> metadata requests can only blindly choose a node for retry in the worst 
> scenario. We want to make sure the client can get the metadata smoothly and 
> as soon as possible. As a result, we need this connection.setup.timeout.
> Implementing the timeout in poll() or anywhere else might need an extra 
> iteration of all nodes, which might downgrade the network client performance.
> I also updated the KIP content and KIP status. Please let me know if the 
> above ideas make sense. Thanks.
> 
> Best, - Cheng Tan
> 
> 
> 
>> On May 4, 2020, at 5:26 PM, Colin McCabe > > wrote:
>> 
>> Hi Cheng,
>> 
>> On the KIP page, it lists this KIP as "draft."  It seems like "under 
>> discussion" is appropriate here, right?
>> 
>>> Currently, the initial socket connection timeout is depending on Linux 
>>> kernel setting
>>> tcp_syn_retries. The timeout value is 2 ^ (tcp_sync_retries + 1) - 1 
>>> seconds. For the
>>> reasons below, we want to control the client-side socket timeout directly 
>>> using 
>>> configuration files
>> 
>> Linux is just one example of an OS that Kafka could run on, right?  You 
>> could also be running on MacOS, for example.
>> 
>>> I'm proposing to do a lazy socket connection time out. That is, we only 
>>> check if
>>> we need to timeout a socket when we consider the corresponding node as a 
>>> candidate in the node provider.
>> 
>> The NodeProvider is an AdminClient abstraction, right?  Why wouldn't we 
>> implement a connection setup timeout for all clients, not just AdminClient?
>> 
>> best,
>> Colin
>> 
>> On Mon, May 4, 2020, at 13:18, Colin McCabe wrote:
>>> Hmm.  A big part of the reason behind the KIP is that the default 
>>> connection timeout behavior of the OS doesn't work for Kafka, right?  
>>> For example, on Linux, if we wait 127 seconds for a connection attempt 
>>> to time out, we won't get a chance to make another attempt in most 
>>> cases.  So I think it makes sense to set a shorter default.
>>> 
>>> best,
>>> Colin
>>> 
>>> 
>>> On Mon, May 4, 2020, at 09:44, Jose Garcia Sancio wrote:
 Thanks for the KIP Cheng,
 
> The default value will be 10 seconds.
 
 I think we should make the default the current behavior. Meaning the
 default should leverage the default connect timeout from the operating
 system.
 
> Proposed Changes
 
 I don't fully understand this section. It seems like it is mainly
 focused on the problem with the current implementation. Can you
 explain how the proposed changes solve the problem?
 
 Thanks.
 
 
 -- 
 -Jose
 
>>> 
> 



Re: [DISCUSS] KIP-606: Add Metadata Context to MetricsReporter

2020-05-07 Thread Gwen Shapira
This would be useful, thank you :)

On Tue, May 5, 2020 at 4:58 PM Xavier Léauté  wrote:

> Hi Everyone,
>
> I've published a KIP to address some shortcoming of our current metrics
> reporter interface. Would appreciate feedback.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-606%3A+Add+Metadata+Context+to+MetricsReporter
>
> Thank you,
> Xavier
>


-- 
Gwen Shapira
Engineering Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-07 Thread Jose Garcia Sancio
Cheng,

Thanks for the KIP and the detailed proposal section. LGTM!

On Thu, May 7, 2020 at 3:38 PM Cheng Tan  wrote:
>
> I think more about the potential wider use cases, I modified the proposal to 
> target all the connection. Thanks.
>
> - Best, - Cheng Tan
>
> > On May 7, 2020, at 1:41 AM, Cheng Tan  wrote:
> >
> > Hi Colin,
> >
> > Sorry for the confusion. I’m proposing to implement timeout in the 
> > NetworkClient.leastLoadedNode() when iterating all the cached node. The 
> > alternative I can think is to implement the timeout in NetworkClient.poll()
> >
> > I’d prefer to implement in the leastLoadedNode(). Here’re the reasons:
> > Usually when clients send a request, they will asking the network client to 
> > send the request to a specific node. In this case, the 
> > connection.setup.timeout won’t matter too much because the client doesn’t 
> > want to try other nodes for that specific request. The request level 
> > timeout would be enough. The metadata fetcher fetches the nodes status 
> > periodically so the clients can reassign the request to another node after 
> > timeout.
> > Consumer, producer, and AdminClient are all using leastLoadedNode() for 
> > metadata fetch, where the connection setup timeout can play an important 
> > role. Unlike other requests can refer to the metadata for node condition, 
> > the metadata requests can only blindly choose a node for retry in the worst 
> > scenario. We want to make sure the client can get the metadata smoothly and 
> > as soon as possible. As a result, we need this connection.setup.timeout.
> > Implementing the timeout in poll() or anywhere else might need an extra 
> > iteration of all nodes, which might downgrade the network client 
> > performance.
> > I also updated the KIP content and KIP status. Please let me know if the 
> > above ideas make sense. Thanks.
> >
> > Best, - Cheng Tan
> >
> >
> >
> >> On May 4, 2020, at 5:26 PM, Colin McCabe  >> > wrote:
> >>
> >> Hi Cheng,
> >>
> >> On the KIP page, it lists this KIP as "draft."  It seems like "under 
> >> discussion" is appropriate here, right?
> >>
> >>> Currently, the initial socket connection timeout is depending on Linux 
> >>> kernel setting
> >>> tcp_syn_retries. The timeout value is 2 ^ (tcp_sync_retries + 1) - 1 
> >>> seconds. For the
> >>> reasons below, we want to control the client-side socket timeout directly 
> >>> using
> >>> configuration files
> >>
> >> Linux is just one example of an OS that Kafka could run on, right?  You 
> >> could also be running on MacOS, for example.
> >>
> >>> I'm proposing to do a lazy socket connection time out. That is, we only 
> >>> check if
> >>> we need to timeout a socket when we consider the corresponding node as a
> >>> candidate in the node provider.
> >>
> >> The NodeProvider is an AdminClient abstraction, right?  Why wouldn't we 
> >> implement a connection setup timeout for all clients, not just AdminClient?
> >>
> >> best,
> >> Colin
> >>
> >> On Mon, May 4, 2020, at 13:18, Colin McCabe wrote:
> >>> Hmm.  A big part of the reason behind the KIP is that the default
> >>> connection timeout behavior of the OS doesn't work for Kafka, right?
> >>> For example, on Linux, if we wait 127 seconds for a connection attempt
> >>> to time out, we won't get a chance to make another attempt in most
> >>> cases.  So I think it makes sense to set a shorter default.
> >>>
> >>> best,
> >>> Colin
> >>>
> >>>
> >>> On Mon, May 4, 2020, at 09:44, Jose Garcia Sancio wrote:
>  Thanks for the KIP Cheng,
> 
> > The default value will be 10 seconds.
> 
>  I think we should make the default the current behavior. Meaning the
>  default should leverage the default connect timeout from the operating
>  system.
> 
> > Proposed Changes
> 
>  I don't fully understand this section. It seems like it is mainly
>  focused on the problem with the current implementation. Can you
>  explain how the proposed changes solve the problem?
> 
>  Thanks.
> 
> 
>  --
>  -Jose
> 
> >>>
> >
>


-- 
-Jose


Re: [DISCUSS] Kafka 3.0

2020-05-07 Thread Colin McCabe
On Wed, May 6, 2020, at 21:33, Ryanne Dolan wrote:
> > In fact, we know that the bridge release will involve at least one
> > incompatible change.  We will need to drop support for the --zookeeper
> > flags in the command-line tools.
> 
> If the bridge release(s) and the subsequent post-ZK release are _both_
> breaking changes, I think we only have one option: the 3.x line are the
> bridge release(s), and ZK is removed in 4.0, as suggested by Andrew
> Schofield.
> 
> Specifically:
> - in order to _remove_ (not merely deprecate) the --zookeeper args, we will
> need a major release.
> - in oder to drop support for ZK entirely (e.g. break a bunch of external
> tooling like Cruise Control), we will need a major release.
> 
> I count two major releases.

Hi Ryanne,

I agree that dropping ZK completely will need a new major release after 3.0.  I 
think that's OK and in keeping with how we've handled deprecation and removal 
in the past.  It's important for users to have a smooth upgrade path.

best,
Colin

> 
> Ryanne
> 
> -
> 
> On Wed, May 6, 2020 at 10:52 PM Colin McCabe  wrote:
> 
> > On Mon, May 4, 2020, at 17:12, Ryanne Dolan wrote:
> > > Hey Colin, I think we should wait until after KIP-500's "bridge
> > > release" so there is a clean break from Zookeeper after 3.0. The
> > > bridge release by definition is an attempt to not break anything, so
> > > it theoretically doesn't warrant a major release.
> >
> > Hi Ryanne,
> >
> > I think it's important to clarify this a little bit.  The bridge release
> > (really, releases, plural) allow you to upgrade from a cluster that is
> > using ZooKeeper to one that is not using ZooKeeper.  But, that doesn't
> > imply that the bridge release itself doesn't break anything.  Upgrading
> > to the bridge release itself might involve some minor incompatibility.
> >
> > Kafka does occasionally have incompatible changes.  In those cases, we
> > bump the major version number.  One example is that when we went from
> > Kafka 1.x to Kafka 2.0, we dropped support for JDK7.  This is an
> > incompatible change.
> >
> > In fact, we know that the bridge release will involve at least one
> > incompatible change.  We will need to drop support for the --zookeeper
> > flags in the command-line tools.
> >
> > We've been preparing for this change for a long time.  People have spent
> > a lot of effort designing new APIs that can be used instead of the old
> > zookeeper-based code that some of the command-line tools used.  We have
> > also deprecated the old ZK-based flags.  But at the end of the day, it
> > is still an incompatible change.  So it's unfortunately not possible for
> > the
> > bridge release to be a 2.x release.
> >
> > > If that's not the case (i.e. if a single "bridge release" turns out to
> > > be impractical), we should consider forking 3.0 while maintaining a
> > > line of Zookeeper-dependent Kafka in 2.x. That way 3.x can evolve
> > > dramatically without breaking the 2.x line. In particular, anything
> > > related to removing Zookeeper could land in pre-3.0 while every other
> > > feature targets 2.6.
> >
> > Just to be super clear about this, what we want to do here is support
> > operating in __either__ KIP-500 mode and legacy mode for a while.  So the
> > same branch will have support for both the old way and the new way of
> > managing metadata.
> >
> > This will allow us to get an "alpha" version of the KIP-500 mode out early
> > for people to experiment with.  It also greatly reduces the number of Kafka
> > releases we have to make, and the amount of backporting we have to do.
> >
> > >
> > > If you are proposing 2.6 should be the "bridge release", I think this
> > > is premature given Kafka's time-based release schedule. If the bridge
> > > features happen to be merged before 2.6's feature freeze, then sure --
> > > let's make that the bridge release in retrospect. And if we get all
> > > the post-Zookeeper features merged before 2.7, I'm onboard with naming
> > > it "3.0" instead.
> > >
> > > That said, we should aim to remove legacy MirrorMaker before 3.0 as
> > > well. I'm happy to drive that additional breaking change. Maybe 2.6
> > > can be the "bridge" for MM2 as well.
> >
> > I don't have a strong opinion either way about this, but if we want to
> > remove the original MirrorMaker, we have to deprecate it first, right?  Are
> > we ready to do that?
> >
> > best,
> > Colin
> >
> > >
> > > Ryanne
> > >
> > > On Mon, May 4, 2020, 5:05 PM Colin McCabe  wrote:
> > >
> > > > Hi all,
> > > >
> > > > We've had a few proposals recently for incompatible changes.  One of
> > > > them is my KIP-604: Remove ZooKeeper Flags from the Administrative
> > > > Tools.  The other is Boyang's KIP-590: Redirect ZK Mutation
> > > > Protocols to the Controller.  I think it's time to start thinking
> > > > about Kafka 3.0. Specifically, I think we should move to 3.0 after
> > > > the 2.6 release.
> > > >
> > > > From the perspective of KIP-500, in Kafka 3.x we'd like to make
> > > 

Re: [DISCUSS] KIP-595: A Raft Protocol for the Metadata Quorum

2020-05-07 Thread Guozhang Wang
Thanks Jun. Further replies are in-lined.

On Mon, May 4, 2020 at 11:58 AM Jun Rao  wrote:

> Hi, Guozhang,
>
> Thanks for the reply. A few more replies inlined below.
>
> On Sun, May 3, 2020 at 6:33 PM Guozhang Wang  wrote:
>
> > Hello Jun,
> >
> > Thanks for your comments! I'm replying inline below:
> >
> > On Fri, May 1, 2020 at 12:36 PM Jun Rao  wrote:
> >
> > > 101. Bootstrapping related issues.
> > > 101.1 Currently, we support auto broker id generation. Is this
> supported
> > > for bootstrap brokers?
> > >
> >
> > The vote ids would just be the broker ids. "bootstrap.servers" would be
> > similar to what client configs have today, where "quorum.voters" would be
> > pre-defined config values.
> >
> >
> My question was on the auto generated broker id. Currently, the broker can
> choose to have its broker Id auto generated. The generation is done through
> ZK to guarantee uniqueness. Without ZK, it's not clear how the broker id is
> auto generated. "quorum.voters" also can't be set statically if broker ids
> are auto generated.
>
> Jason has explained some ideas that we've discussed so far, the reason we
intentional did not include them so far is that we feel it is out-side the
scope of KIP-595. Under the umbrella of KIP-500 we should definitely
address them though.

On the high-level, our belief is that "joining a quorum" and "joining (or
more specifically, registering brokers in) the cluster" would be
de-coupled a bit, where the former should be completed before we do the
latter. More specifically, assuming the quorum is already up and running,
after the newly started broker found the leader of the quorum it can send a
specific RegisterBroker request including its listener / protocol / etc,
and upon handling it the leader can send back the uniquely generated broker
id to the new broker, while also executing the "startNewBroker" callback as
the controller.


>
> > > 102. Log compaction. One weak spot of log compaction is for the
> consumer
> > to
> > > deal with deletes. When a key is deleted, it's retained as a tombstone
> > > first and then physically removed. If a client misses the tombstone
> > > (because it's physically removed), it may not be able to update its
> > > metadata properly. The way we solve this in Kafka is based on a
> > > configuration (log.cleaner.delete.retention.ms) and we expect a
> consumer
> > > having seen an old key to finish reading the deletion tombstone within
> > that
> > > time. There is no strong guarantee for that since a broker could be
> down
> > > for a long time. It would be better if we can have a more reliable way
> of
> > > dealing with deletes.
> > >
> >
> > We propose to capture this in the "FirstDirtyOffset" field of the quorum
> > record fetch response: the offset is the maximum offset that log
> compaction
> > has reached up to. If the follower has fetched beyond this offset it
> means
> > itself is safe hence it has seen all records up to that offset. On
> getting
> > the response, the follower can then decide if its end offset actually
> below
> > that dirty offset (and hence may miss some tombstones). If that's the
> case:
> >
> > 1) Naively, it could re-bootstrap metadata log from the very beginning to
> > catch up.
> > 2) During that time, it would refrain itself from answering
> MetadataRequest
> > from any clients.
> >
> >
> I am not sure that the "FirstDirtyOffset" field fully addresses the issue.
> Currently, the deletion tombstone is not removed immediately after a round
> of cleaning. It's removed after a delay in a subsequent round of cleaning.
> Consider an example where a key insertion is at offset 200 and a deletion
> tombstone of the key is at 400. Initially, FirstDirtyOffset is at 300. A
> follower/observer fetches from offset 0  and fetches the key at offset 200.
> A few rounds of cleaning happen. FirstDirtyOffset is at 500 and the
> tombstone at 400 is physically removed. The follower/observer continues the
> fetch, but misses offset 400. It catches all the way to FirstDirtyOffset
> and declares its metadata as ready. However, its metadata could be stale
> since it actually misses the deletion of the key.
>
> Yeah good question, I should have put more details in my explanation :)

The idea is that we will adjust the log compaction for this raft based
metadata log: before more details to be explained, since we have two types
of "watermarks" here, whereas in Kafka the watermark indicates where every
replica have replicated up to and in Raft the watermark indicates where the
majority of replicas (here only indicating voters of the quorum, not
counting observers) have replicated up to, let's call them Kafka watermark
and Raft watermark. For this special log, we would maintain both watermarks.

When log compacting on the leader, we would only compact up to the Kafka
watermark, i.e. if there is at least one voter who have not replicated an
entry, it would not be compacted. The "dirty-offset" is the offset that
we've compacted up to and is co

Re: [DISCUSS] KIP-595: A Raft Protocol for the Metadata Quorum

2020-05-07 Thread Guozhang Wang
Hello folks,

I've also updated the KIP wiki page adding a section of the proposed
metrics for this Raft protocol:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP-595:ARaftProtocolfortheMetadataQuorum-Metrics

Please let us know if you have any thoughts about them as well.

Guozhang

On Thu, May 7, 2020 at 4:04 PM Guozhang Wang  wrote:

> Thanks Jun. Further replies are in-lined.
>
> On Mon, May 4, 2020 at 11:58 AM Jun Rao  wrote:
>
>> Hi, Guozhang,
>>
>> Thanks for the reply. A few more replies inlined below.
>>
>> On Sun, May 3, 2020 at 6:33 PM Guozhang Wang  wrote:
>>
>> > Hello Jun,
>> >
>> > Thanks for your comments! I'm replying inline below:
>> >
>> > On Fri, May 1, 2020 at 12:36 PM Jun Rao  wrote:
>> >
>> > > 101. Bootstrapping related issues.
>> > > 101.1 Currently, we support auto broker id generation. Is this
>> supported
>> > > for bootstrap brokers?
>> > >
>> >
>> > The vote ids would just be the broker ids. "bootstrap.servers" would be
>> > similar to what client configs have today, where "quorum.voters" would
>> be
>> > pre-defined config values.
>> >
>> >
>> My question was on the auto generated broker id. Currently, the broker can
>> choose to have its broker Id auto generated. The generation is done
>> through
>> ZK to guarantee uniqueness. Without ZK, it's not clear how the broker id
>> is
>> auto generated. "quorum.voters" also can't be set statically if broker ids
>> are auto generated.
>>
>> Jason has explained some ideas that we've discussed so far, the reason we
> intentional did not include them so far is that we feel it is out-side the
> scope of KIP-595. Under the umbrella of KIP-500 we should definitely
> address them though.
>
> On the high-level, our belief is that "joining a quorum" and "joining (or
> more specifically, registering brokers in) the cluster" would be
> de-coupled a bit, where the former should be completed before we do the
> latter. More specifically, assuming the quorum is already up and running,
> after the newly started broker found the leader of the quorum it can send a
> specific RegisterBroker request including its listener / protocol / etc,
> and upon handling it the leader can send back the uniquely generated broker
> id to the new broker, while also executing the "startNewBroker" callback as
> the controller.
>
>
>>
>> > > 102. Log compaction. One weak spot of log compaction is for the
>> consumer
>> > to
>> > > deal with deletes. When a key is deleted, it's retained as a tombstone
>> > > first and then physically removed. If a client misses the tombstone
>> > > (because it's physically removed), it may not be able to update its
>> > > metadata properly. The way we solve this in Kafka is based on a
>> > > configuration (log.cleaner.delete.retention.ms) and we expect a
>> consumer
>> > > having seen an old key to finish reading the deletion tombstone within
>> > that
>> > > time. There is no strong guarantee for that since a broker could be
>> down
>> > > for a long time. It would be better if we can have a more reliable
>> way of
>> > > dealing with deletes.
>> > >
>> >
>> > We propose to capture this in the "FirstDirtyOffset" field of the quorum
>> > record fetch response: the offset is the maximum offset that log
>> compaction
>> > has reached up to. If the follower has fetched beyond this offset it
>> means
>> > itself is safe hence it has seen all records up to that offset. On
>> getting
>> > the response, the follower can then decide if its end offset actually
>> below
>> > that dirty offset (and hence may miss some tombstones). If that's the
>> case:
>> >
>> > 1) Naively, it could re-bootstrap metadata log from the very beginning
>> to
>> > catch up.
>> > 2) During that time, it would refrain itself from answering
>> MetadataRequest
>> > from any clients.
>> >
>> >
>> I am not sure that the "FirstDirtyOffset" field fully addresses the issue.
>> Currently, the deletion tombstone is not removed immediately after a round
>> of cleaning. It's removed after a delay in a subsequent round of cleaning.
>> Consider an example where a key insertion is at offset 200 and a deletion
>> tombstone of the key is at 400. Initially, FirstDirtyOffset is at 300. A
>> follower/observer fetches from offset 0  and fetches the key at offset
>> 200.
>> A few rounds of cleaning happen. FirstDirtyOffset is at 500 and the
>> tombstone at 400 is physically removed. The follower/observer continues
>> the
>> fetch, but misses offset 400. It catches all the way to FirstDirtyOffset
>> and declares its metadata as ready. However, its metadata could be stale
>> since it actually misses the deletion of the key.
>>
>> Yeah good question, I should have put more details in my explanation :)
>
> The idea is that we will adjust the log compaction for this raft based
> metadata log: before more details to be explained, since we have two types
> of "watermarks" here, whereas in Kafka the watermark indicates where every
> replica ha

Re: [REVIEW REQUEST] KAFKA-3184: Add Checkpoint for In-memory State Store

2020-05-07 Thread Guozhang Wang
Thanks Nikolay,

We will take a look at the PR asap.


Guozhang

On Thu, May 7, 2020 at 4:15 AM Nikolay Izhikov  wrote:

> Hello, Kafka Team.
>
> I prepared a PR [1] for the KAFKA-3184 [2]
> Can someone, please, do the review.
>
>
> [1] https://github.com/apache/kafka/pull/8592
> [2] https://issues.apache.org/jira/browse/KAFKA-3184
>


-- 
-- Guozhang


Re: [DISCUSS] Kafka 3.0

2020-05-07 Thread Guozhang Wang
Hey folks,

Sorry for stating that the bridge release would not break any compatibility
before, which is incorrect and confused many people.

I think one way to think about the versioning is that:

0) In a 2.x version moving ahead we would deprecate the ZK-dependent tools
such as --zookeeper flags from various scripts (KIP-555)

1) In 3.0 we would at least make one incompatible change for example to
remove the deprecated ZK flags.

2) In a future major version (e.g. 4.0) we would drop ZK entirely,
including usages such as security credentials / broker registration / etc
which are via ZK today as well.

Then for the bridge release(s), it can be any or all of 3.x.


For 1), I'd love to add a few more incompatibility changes in 3.0 from
Kafka Streams: we evolve Streams public APIs by deprecating and then remove
in major releases, and since 2.0 we've accumulated quite a few deprecated
APIs, and I can compile a list of KIPs that contain those if people are
interested.


Guozhang


On Thu, May 7, 2020 at 3:53 PM Colin McCabe  wrote:

> On Wed, May 6, 2020, at 21:33, Ryanne Dolan wrote:
> > > In fact, we know that the bridge release will involve at least one
> > > incompatible change.  We will need to drop support for the --zookeeper
> > > flags in the command-line tools.
> >
> > If the bridge release(s) and the subsequent post-ZK release are _both_
> > breaking changes, I think we only have one option: the 3.x line are the
> > bridge release(s), and ZK is removed in 4.0, as suggested by Andrew
> > Schofield.
> >
> > Specifically:
> > - in order to _remove_ (not merely deprecate) the --zookeeper args, we
> will
> > need a major release.
> > - in oder to drop support for ZK entirely (e.g. break a bunch of external
> > tooling like Cruise Control), we will need a major release.
> >
> > I count two major releases.
>
> Hi Ryanne,
>
> I agree that dropping ZK completely will need a new major release after
> 3.0.  I think that's OK and in keeping with how we've handled deprecation
> and removal in the past.  It's important for users to have a smooth upgrade
> path.
>
> best,
> Colin
>
> >
> > Ryanne
> >
> > -
> >
> > On Wed, May 6, 2020 at 10:52 PM Colin McCabe  wrote:
> >
> > > On Mon, May 4, 2020, at 17:12, Ryanne Dolan wrote:
> > > > Hey Colin, I think we should wait until after KIP-500's "bridge
> > > > release" so there is a clean break from Zookeeper after 3.0. The
> > > > bridge release by definition is an attempt to not break anything, so
> > > > it theoretically doesn't warrant a major release.
> > >
> > > Hi Ryanne,
> > >
> > > I think it's important to clarify this a little bit.  The bridge
> release
> > > (really, releases, plural) allow you to upgrade from a cluster that is
> > > using ZooKeeper to one that is not using ZooKeeper.  But, that doesn't
> > > imply that the bridge release itself doesn't break anything.  Upgrading
> > > to the bridge release itself might involve some minor incompatibility.
> > >
> > > Kafka does occasionally have incompatible changes.  In those cases, we
> > > bump the major version number.  One example is that when we went from
> > > Kafka 1.x to Kafka 2.0, we dropped support for JDK7.  This is an
> > > incompatible change.
> > >
> > > In fact, we know that the bridge release will involve at least one
> > > incompatible change.  We will need to drop support for the --zookeeper
> > > flags in the command-line tools.
> > >
> > > We've been preparing for this change for a long time.  People have
> spent
> > > a lot of effort designing new APIs that can be used instead of the old
> > > zookeeper-based code that some of the command-line tools used.  We have
> > > also deprecated the old ZK-based flags.  But at the end of the day, it
> > > is still an incompatible change.  So it's unfortunately not possible
> for
> > > the
> > > bridge release to be a 2.x release.
> > >
> > > > If that's not the case (i.e. if a single "bridge release" turns out
> to
> > > > be impractical), we should consider forking 3.0 while maintaining a
> > > > line of Zookeeper-dependent Kafka in 2.x. That way 3.x can evolve
> > > > dramatically without breaking the 2.x line. In particular, anything
> > > > related to removing Zookeeper could land in pre-3.0 while every other
> > > > feature targets 2.6.
> > >
> > > Just to be super clear about this, what we want to do here is support
> > > operating in __either__ KIP-500 mode and legacy mode for a while.  So
> the
> > > same branch will have support for both the old way and the new way of
> > > managing metadata.
> > >
> > > This will allow us to get an "alpha" version of the KIP-500 mode out
> early
> > > for people to experiment with.  It also greatly reduces the number of
> Kafka
> > > releases we have to make, and the amount of backporting we have to do.
> > >
> > > >
> > > > If you are proposing 2.6 should be the "bridge release", I think this
> > > > is premature given Kafka's time-based release schedule. If the bridge
> > > > features happen to be merge

[jira] [Created] (KAFKA-9971) Error Reporting in Sink Connectors

2020-05-07 Thread Aakash Shah (Jira)
Aakash Shah created KAFKA-9971:
--

 Summary: Error Reporting in Sink Connectors
 Key: KAFKA-9971
 URL: https://issues.apache.org/jira/browse/KAFKA-9971
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Affects Versions: 2.6.0
Reporter: Aakash Shah


Currently, 
[KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]
 provides error handling in Kafka Connect that includes functionality such as 
retrying, logging, and sending errant records to a dead letter queue. However, 
the dead letter queue functionality from KIP-298 only supports error reporting 
within contexts of the transform operation, and key, value, and header 
converter operation. Within the context of the {{put(...)}} method in sink 
connectors, there is no support for dead letter queue/error reporting 
functionality. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-05-07 Thread Guozhang Wang
Just to adds a bit more FYI here related to the last question from David:
in KIP-595 while implementing the new requests we are also adding a
"KafkaNetworkChannel" which is used for brokers to send vote / fetch
records, so besides the discussion on listeners I think implementation wise
we can also consider consolidating a lot of those into the same call-trace
as well -- of course this is not related to public APIs so maybe just needs
to be coordinated among developments:

1. Broker -> Controller: ISR Change, Topic Creation, Admin Redirect
(KIP-497).
2. Controller -> Broker: LeaderAndISR / MetadataUpdate; though these are
likely going to be deprecated post KIP-500.
3. Txn Coordinator -> Broker: TxnMarker


Guozhang

On Wed, May 6, 2020 at 8:58 PM Boyang Chen 
wrote:

> Hey David,
>
> thanks for the feedbacks!
>
> On Wed, May 6, 2020 at 2:06 AM David Jacot  wrote:
>
> > Hi Boyang,
> >
> > While re-reading the KIP, I've got few small questions/comments:
> >
> > 1. When auto topic creation is enabled, brokers will send a
> > CreateTopicRequest
> > to the controller instead of writing to ZK directly. It means that
> > creation of these
> > topics are subject to be rejected with an error if a CreateTopicPolicy is
> > used. Today,
> > it bypasses the policy entirely. I suppose that clusters allowing auto
> > topic creation
> > don't have a policy in place so it is not a big deal. I suggest to call
> > out explicitly the
> > limitation in the KIP though.
> >
> > That's a good idea, will add to the KIP.
>
>
> > 2. In the same vein as my first point. How do you plan to handle errors
> > when internal
> > topics are created by a broker? Do you plan to retry retryable errors
> > indefinitely?
> >
> > I checked a bit on the admin client handling of the create topic RPC. It
> seems that
> the only retriable exceptions at the moment are NOT_CONTROLLER and
> REQUEST_TIMEOUT.
> So I guess we just need to retry on these exceptions?
>
>
> > 3. Could you clarify which listener will be used for the internal
> requests?
> > Do you plan
> > to use the control plane listener or perhaps the inter-broker listener?
> >
> > As we discussed in the KIP, currently the internal design for
> broker->controller channel has not been
> done yet, and I feel it makes sense to consolidate redirect RPC and
> internal topic creation RPC to this future channel,
> which are details to be filled in the near future, right now some
> controller refactoring effort is still WIP.
>
>
> > Thanks,
> > David
> >
> > On Mon, May 4, 2020 at 9:37 AM Sönke Liebau
> >  wrote:
> >
> > > Ah, I see, thanks for the clarification!
> > >
> > > Shouldn't be an issue I think. My understanding of KIPs was always that
> > > they are mostly intended as a place to discuss and agree changes up
> > front,
> > > whereas tracking the actual releases that things go into should be
> > handled
> > > in Jira.
> > > So maybe we just create new jiras for any subsequent work and either
> link
> > > those or make them subtasks (even though this jira is already a subtask
> > > itself), that should allow us to properly track all releases that work
> > goes
> > > into.
> > >
> > > Thanks for your work on this!!
> > >
> > > Best,
> > > Sönke
> > >
> > >
> > > On Sat, 2 May 2020 at 00:31, Boyang Chen 
> > > wrote:
> > >
> > > > Sure thing Sonke, what I suggest is that usual KIPs get accepted to
> go
> > > into
> > > > next release. It could span for a couple of releases because of
> > > engineering
> > > > time, but no change has to be shipped in specific future releases,
> like
> > > the
> > > > backward incompatible change for KafkaPrincipal. But I guess it's not
> > > > really a blocker, as long as we stated clearly in the KIP how we are
> > > going
> > > > to roll things out, and let it partially finish in 2.6.
> > > >
> > > > Boyang
> > > >
> > > > On Fri, May 1, 2020 at 2:32 PM Sönke Liebau
> > > >  wrote:
> > > >
> > > > > Hi Boyang,
> > > > >
> > > > > thanks for the update, sounds reasonable to me. Making it a
> breaking
> > > > change
> > > > > is definitely the safer route to go.
> > > > >
> > > > > Just one quick question regarding your mail, I didn't fully
> > understand
> > > > what
> > > > > you mean by "I think this is the first time we need to introduce a
> > KIP
> > > > > without having it
> > > > > fully accepted in next release."  - could you perhaps explain that
> > some
> > > > > more very briefly?
> > > > >
> > > > > Best regards,
> > > > > Sönke
> > > > >
> > > > >
> > > > >
> > > > > On Fri, 1 May 2020 at 23:03, Boyang Chen <
> reluctanthero...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hey Tom,
> > > > > >
> > > > > > thanks for the suggestion. As long as we could correctly
> serialize
> > > the
> > > > > > principal and embed in the Envelope, I think we could still
> > leverage
> > > > the
> > > > > > controller to do the client request authentication. Although this
> > > pays
> > > > an
> > > > > > extra round trip if the authorization is doomed to fa

Fwd: Is committing offset required for Consumer

2020-05-07 Thread Boyuan Zhang
Add dev mailing list as well.

Thanks for your help!
-- Forwarded message -
From: Boyuan Zhang 
Date: Thu, May 7, 2020 at 3:51 PM
Subject: Is committing offset required for Consumer
To: 


Hi team,

I'm building an application which uses Kafka Consumer APIs to read messages
from topics. I plan to manually assign TopicPartitions to my consumer and
seek a certain offset before starting to read. I'll also materialize the
last read offset and reuse it when creating the consumer later.

Within my usage, I' curious whether I need to commit offset automatically
or manually. While going through the doc, it seems like committing offset
is only important to dynamic assignment.

Another question around manual assignment is, is it still true that I need
to call poll() continuously to keep the consumer in the group described as
below?

> It is also possible that the consumer could encounter a "livelock"
> situation where it is continuing to send heartbeats, but no progress is
> being made. To prevent the consumer from holding onto its partitions
> indefinitely in this case, we provide a liveness detection mechanism using
> the max.poll.interval.ms setting. Basically if you don't call poll at
> least as frequently as the configured max interval, then the client will
> proactively leave the group so that another consumer can take over its
> partitions. When this happens, you may see an offset commit failure (as
> indicated by a CommitFailedException
> 
>  thrown
> from a call to commitSync()
> ).
> This is a safety mechanism which guarantees that only active members of the
> group are able to commit offsets. So to stay in the group, you must
> continue to call poll.

What will happen to poll() with manual assignment if the consumer is
removed from the group?

Thanks for your help!

Boyuan


Jenkins build is back to normal : kafka-trunk-jdk11 #1435

2020-05-07 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk14 #66

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9667: Connect JSON serde strip trailing zeros (#8230)


--
[...truncated 6.16 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldGetSinkTopicNames[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldGetSinkTopicNames[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] S

Jenkins build is back to normal : kafka-trunk-jdk8 #4512

2020-05-07 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-606: Add Metadata Context to MetricsReporter

2020-05-07 Thread Randall Hauch
Thanks, Xavier.

This does seem very useful. A minor request would be to mention the new
configs for Connect, Streams and clients, specifically that because they
are optional they will not hinder upgrades, and because they are namespaced
are unlikely to clash or conflict with other configs from extensions.

Thanks, and best regards.

Randall

On Thu, May 7, 2020 at 5:38 PM Gwen Shapira  wrote:

> This would be useful, thank you :)
>
> On Tue, May 5, 2020 at 4:58 PM Xavier Léauté  wrote:
>
> > Hi Everyone,
> >
> > I've published a KIP to address some shortcoming of our current metrics
> > reporter interface. Would appreciate feedback.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-606%3A+Add+Metadata+Context+to+MetricsReporter
> >
> > Thank you,
> > Xavier
> >
>
>
> --
> Gwen Shapira
> Engineering Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: default auto.offset.reset value is reset to none

2020-05-07 Thread Matthias J. Sax
There is a ticket about it: https://issues.apache.org/jira/browse/KAFKA-7480

The current solution is to delete the local copy of the global state
store manually to be able to restart.


-Matthias

On 5/6/20 6:14 AM, Alexander Sibiryakov wrote:
> Hello,
> 
> I'm facing an issue in one of our Kafka Streams applications using
> GlobalKTable. The idea is to have a GlobalKTable over compacted topic
> and be able to re-read it on startup. We had a consumer group and topic
> sometime ago, recently I've recreated a topic, requiring consumer
> offsets to be reset and consumed from beginning. But application started
> to fail with OffsetOutOfRangeException and message "Offsets out of range
> with no configured reset policy for partitions..". I do have
> auto.offset.reset set in my configuration to "latest", but it is
> overridden to "none" for global and restore consumers of Streams
> application. 
> 
> This exception is resulting in a shutdown loop, and requiring
> investigation to understand what is going on.
> 
> 
> This is the line where it is happening
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1244
> 
> So the question is this behavior of overriding offset reset policy is
> intended? If not, please confirm it is a bug, and will submit a patch.
> 
> See the Streams output for detailed configs, traceback and exceptions in
> attachment.
> 
> Thanks,
> A.



signature.asc
Description: OpenPGP digital signature


Re: Please give me the permission to create KIPs

2020-05-07 Thread Matthias J. Sax
Done.

On 5/6/20 3:06 AM, Eddi Em wrote:
> My id: eddiejamsession
> 
> Eddie
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-07 Thread Christopher Egerton
Hi Aakash,

Thanks for the KIP! Given the myriad of different error-handling mechanisms
available in sink connectors today, I think it'd be a great improvement to
add this kind of support to the framework and not only take some of the
development burden off of connector writers but also some of the
configuration burden off of users.

I've got some thoughts but no major objections:

1. It's stated in the "Motivation" section that "this proposal aims to
extend KIP-298 and add error reporting functionality within the context of
put(...) without adding redundancy in configuration." It sounds like this
is true insofar as we're trying to fill a gap explicitly called out by
KIP-298 with regards to records that are given to a task via SinkTask::put,
but reading further, it seems like there's nothing in place that would
prevent a task from sending a record to the error reporter outside of its
"put" method. Would it be fair to say that the proposal "aims to extend
KIP-298 and add error reporting functionality even after records are sent
to connector tasks without adding redundancy in configuration" instead?

2. I'm assuming that the javadoc comment above the "errantRecordReporter"
method in the sample Java code in the "Method" section is the proposed
javadoc description for the method we'll add to the SinkTask class. We
should be pretty careful about that description since it's the very first
thing some users will read when learning about this new error reporting
mechanism; I've got some suggestions about how we might improve the
language there:

a - In the first paragraph, "Set the error reporter for this task.", I
think we should call out that this is a method that will be invoked by the
Connect framework. Maybe something like "Invoked by the framework to supply
the task with an error reporter, if the user has configured one for this
connector"?

b - In the next paragraph, "The general usage pattern for this error
reporter is to use this method to set it, and invoke its {@link
accept(SinkRecord record, Throwable e)} method when an exception is thrown
while processing a record in {@link put(Collection records)} to
send this errant record to the error reporter.", I think we can be a little
more concise. What do you think about just "Tasks can send problematic
records back to the framework  by invoking {@link accept(SinkRecord record,
Throwable e)}"?

3. Just below that section, in the sample code, it looks like the task
isn't performing a null check on the "errorReporter" field. According to
the "Backwards Compatibility" section, it's possible that that field might
be null; do you think we might want to update the sample code to perform
that null check and add a brief comment on why?

4. It looks like the config properties are only related to setting up a DLQ
for a connector. Although properties like "errors.retry.timeout" and "
errors.retry.delay.max.ms" are probably out of scope for this KIP, why not
also include the "errors.tolerance", "errors.log.enable", and
"errors.log.include.messages" properties? Seems like these would also be
relevant for the kinds of records that a task might send to its error
reporter. If we decide to include these properties, we should also update
the "Synchrony" section to be agnostic about what the error reporter is
doing under the hood since there won't necessarily be a Kafka producer
involved in handling records given to the error reporter.

5. In the "Order of Operations section, the second step is "The task calls
start(...)". Isn't SinkTask::start invoked by the framework, and not
normally by tasks themselves?

6. The "Metrics" section is a little sparse; do you think you could add
some detail on what format the metrics will take (presumably JMX but other
ideas are always welcome), and what their names and types will be? Might
help to check out some other KIPs in the past that have added metrics such
as KIP-475 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-475%3A+New+Metrics+to+Measure+Number+of+Tasks+on+a+Connector),
KIP-507 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints#KIP-507:SecuringInternalConnectRESTEndpoints-NewJMXworkermetric),
and last but not least, KIP-196 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework#KIP-196:AddmetricstoKafkaConnectframework-PublicInterfaces
).

7. In the "Rejected Alternatives" section, I think there's a rationale
missing for the item "Labeling as a dead letter queue" :)

Thanks again for the KIP! Looking forward to seeing this in the framework,
I think it'd be great.

Cheers,

Chris

On Thu, May 7, 2020 at 12:02 PM Andrew Schofield 
wrote:

> Hi,
> Thanks for the KIP.
>
> I wonder whether this idea would be better implemented using a new method
> on the
> SinkTaskContext.
>
>   public void putFailed(Collection records)
>
> Then the rest of KIP-298 could apply. Failed records could be put to the
> DLQ or logged, as
> appropriate. 

Re: [Discuss] KIP-582 Add a "continue" option for Kafka Connect error handling

2020-05-07 Thread Christopher Egerton
Hi Zihan,

In my experience, direct analysis of a record isn't often necessary when
diagnosing issues with conversion or transformations. I raised the idea of
log messages because in most cases they've been completely sufficient,
paired with the source code for the converter or the transformation, to
help me diagnose issues with records in Connect.

What I'm trying to understand here is what the cases are where directly
examining a record is necessary, and how, in those cases, a console
consumer might not be sufficient. What kinds of errors are you thinking of
that might be difficult to debug with the current setup, and would benefit
from analysis in an external system?

Cheers,

Chris

On Wed, May 6, 2020 at 12:29 PM Zihan Li  wrote:

> Hi Chris,
>
> Thanks a lot for the reply.
>
> To make error messages easy to find, I think we can add messages to the
> header of a broken record, just like the mechanism in DLQ. This way even
> if the broken record is stored into external systems, error messages and
> broken records are paired.
>
> Currently, after a broken record is sent to DLQ, there are usually two
> ways
> to analysis that. One method is to use some consumer tool to examine the
> messages directly. The other is to consume the DLQ again to external
> system for analysis. This proposal would help in the second case by
> eliminating the DLQ sink connector. In the first case, most open-sourced
> consumer tools are not as powerful as external tools in terms of querying,
> aggregating, and pattern finding bytes messages. Confluent KSQL is a
> powerful consumer tool, but it is not part of the open-sourced project.
> Therefore I think the proposal would help in the first case by not only
> flatten learning curve of consumer tools, but also enabling extern tools
> for
> analysis.
>
> Best,
> Zihan
>
> On 2020/05/03 17:36:34, Christopher Egerton  wrote:
> > Hi Zihan,
> >
> > I guess I'm still unclear on exactly what form this analysis might take.
> If
> > a converter has an issue (de)-serializing a record, for example, the
> first
> > thing I check out is the stack trace in the worker logs that tells me
> what
> > went wrong and where. The same goes for errors thrown during
> > transformation. Can we have some concrete examples about what kind
> analysis
> > performed on byte arrays in external systems might be more informative,
> > especially when it would either be performed without easy-to-find log
> > messages or require extra effort to make those log messages easy to find
> > and associate with the bytes in the external system?
> >
> > Cheers,
> >
> > Chris
> >
> > On Thu, Apr 30, 2020 at 1:01 PM Zihan Li  wrote:
> >
> > > Hi Chris and Andrew,
> > >
> > > Thanks a lot for your reply!
> > >
> > > I think in most cases it is easier to analysis broken records in an
> > > external
> > > system rather than in a Kafka DLQ topic. While it might be possible to
> > > directly analysis broken records with Kafka, people are generally more
> > > familiar with external tools, such as file systems and relational
> > > databases.
> > > Exporting broken records to those external systems would enable many
> more
> > > analysis tools. Users can use those tools to audit end-to-end data flow
> > > and
> > > work with upstream teams to improve data quality. As a result, in many
> > > cases, DLQ is consumed again by an additional connector for further
> > > analysis.
> > > So as Chris have mentioned, the point of this KIP is to save user the
> > > extra
> > > time and effort to maintain and tune this addition DLQ sink connector.
> > >
> > > The expected behavior of this new error handling option should be
> > > consistent
> > > with DLQ. Namely, if any of key, value or header is broken, the record
> > > should be sent to SinkTask.putBrokenRecord() instead of DLQ.
> > >
> > > Best,
> > > Zihan
> > >
> > > On 2020/04/25 20:05:37, Christopher Egerton 
> wrote:
> > > > Hi Zihan,
> > > >
> > > > Thanks for the changes and the clarifications! I agree that the
> > > complexity
> > > > of maintaining a second topic and a second connector is a fair
> amount of
> > > > work; to Andrew's question, it seems less about the cost of just
> running
> > > > another connector, and more about managing that second connector (and
> > > > topic) when a lot of the logic is identical, such as topic ACLs,
> > > > credentials for the connector to access the external system, and
> other
> > > > fine-tuning.
> > > >
> > > > However, I'm still curious about the general use case here. For
> example,
> > > if
> > > > a converter fails to deserialize a record, it seems like the right
> thing
> > > to
> > > > do would be to examine the record, try to understand why it's
> failing,
> > > and
> > > > then find a converter that can handle it. If the raw byte array for
> the
> > > > Kafka message gets written to the external system instead, what's the
> > > > benefit to the user? Yes, they won't have to configure another
> connector
> > > > and manage another topi

Build failed in Jenkins: kafka-trunk-jdk11 #1436

2020-05-07 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-6145: Set HighAvailabilityTaskAssignor as default in


--
[...truncated 3.08 MB...]

org.apache.kafka.streams.TestTopicsTest > testTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders STARTED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValue STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValue PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFact

Jenkins build is back to normal : kafka-2.5-jdk8 #112

2020-05-07 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-9972) Corrupted standby task could be committed

2020-05-07 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9972:
--

 Summary: Corrupted standby task could be committed
 Key: KAFKA-9972
 URL: https://issues.apache.org/jira/browse/KAFKA-9972
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Boyang Chen
Assignee: Boyang Chen


A corrupted standby task could revive and transit to the CREATED state, which 
will then trigger by `taskManager.commitAll` in next runOnce, causing an 
illegal state:

```

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,646] WARN 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException 
fetching records from restore consumer for partitions 
[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1], it is 
likely that the consumer's position has fallen out of the topic partition 
offset range because the topic was truncated or compacted on the broker, 
marking the corresponding tasks as corrupted and re-initializing it later. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,646] WARN 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] Detected 
the states of tasks 
\{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1]} 
are corrupted. Will close the task as dirty and re-create and bootstrap from 
scratch. (org.apache.kafka.streams.processor.internals.StreamThread)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
\{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1]} 
are corrupted and hence needs to be re-initialized

        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:428)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:680)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,652] INFO 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
[Consumer 
clientId=stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1-restore-consumer,
 groupId=null] Unsubscribed all topics or patterns and assigned partitions 
(org.apache.kafka.clients.consumer.KafkaConsumer)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,652] INFO 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
standby-task [1_1] Prepared dirty close 
(org.apache.kafka.streams.processor.internals.StandbyTask)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,679] INFO 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
standby-task [1_1] Closed dirty 
(org.apache.kafka.streams.processor.internals.StandbyTask)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,751] ERROR 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
java.lang.IllegalStateException: Illegal state CREATED while preparing standby 
task 1_1 for committing

        at 
org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:134)

        at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:752)

        at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:741)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:863)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:725)

        at 
o

[jira] [Resolved] (KAFKA-9290) Update IQ related JavaDocs

2020-05-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9290.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Update IQ related JavaDocs
> --
>
> Key: KAFKA-9290
> URL: https://issues.apache.org/jira/browse/KAFKA-9290
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: highluck
>Priority: Minor
>  Labels: beginner, newbie
> Fix For: 2.6.0
>
>
> In Kafka 2.1.0 we deprecated couple of methods (KAFKA-7277) to pass in 
> timestamps via IQ API via Duration/Instance parameters instead of plain longs.
> In Kafka 2.3.0 we introduced TimestampedXxxStores (KAFKA-3522) and allow IQ 
> to return the stored timestamp.
> However, we never update our JavaDocs that contain code snippets to 
> illustrate how a local store can be queries. For example 
> `KGroupedStream#count(Materialized)` 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java#L116-L122]):
>  
> {code:java}
> * {@code * KafkaStreams streams = ... // counting words
> * String queryableStoreName = "storeName"; // the store name should be the 
> name of the store as defined by the Materialized instance
> * ReadOnlyKeyValueStore localStore = 
> streams.store(queryableStoreName, QueryableStoreTypes. Long>keyValueStore());
> * String key = "some-word";
> * Long countForWord = localStore.get(key); // key must be local (application 
> state is shared over all running Kafka Streams instances)
> * }
> {code}
> We should update all JavaDocs to use `TimestampedXxxStore` and the new 
> Duration/Instance methods in all those code snippets.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9973) __consumer_offsets record is invalid lead to log clean failed and __consumer_offsets grows too big

2020-05-07 Thread leibo (Jira)
leibo created KAFKA-9973:


 Summary: __consumer_offsets record is invalid lead to log clean 
failed and __consumer_offsets grows too big
 Key: KAFKA-9973
 URL: https://issues.apache.org/jira/browse/KAFKA-9973
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.1.1
Reporter: leibo


{code:java}
//代码占位符
[2019-12-30 19:21:41,047] INFO [kafka-log-cleaner-thread-6]: Starting 
(kafka.log.LogCleaner)
[2019-12-30 19:21:41,079] INFO [kafka-log-cleaner-thread-7]: Starting 
(kafka.log.LogCleaner)
[2019-12-30 19:21:42,825] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log 
Log(/home/kafka_2.12-2.1.1/data/kafka-logs/__consumer_offsets-34). Marking its 
partition (__consumer_offsets-34) as uncleanable (kafka.log.LogCleaner)
org.apache.kafka.common.record.InvalidRecordException: Found invalid number of 
record headers -47
--
[2019-12-30 20:23:51,340] INFO [kafka-log-cleaner-thread-5]: Starting 
(kafka.log.LogCleaner)
[2019-12-30 20:23:51,361] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-34 for 52 segments in offset range [26842714, 73423730). 
(kafka.log.LogCleaner)
[2019-12-30 20:23:51,398] INFO [kafka-log-cleaner-thread-6]: Starting 
(kafka.log.LogCleaner)
[2019-12-30 20:23:51,493] INFO [kafka-log-cleaner-thread-7]: Starting 
(kafka.log.LogCleaner)
[2019-12-30 20:23:59,596] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log 
Log(/home/kafka_2.12-2.1.1/data/kafka-logs/__consumer_offsets-34). Marking its 
partition (__consumer_offsets-34) as uncleanable (kafka.log.LogCleaner)
org.apache.kafka.common.record.InvalidRecordException: Found invalid number of 
record headers -47

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : kafka-trunk-jdk14 #67

2020-05-07 Thread Apache Jenkins Server
See