[jira] [Commented] (KAFKA-10375) Restore consumer fails with SSL handshake fail exception

2020-08-28 Thread Satyawati Tripathi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186348#comment-17186348
 ] 

Satyawati Tripathi commented on KAFKA-10375:


Hey , can i take this up?

> Restore consumer fails with SSL handshake fail exception
> 
>
> Key: KAFKA-10375
> URL: https://issues.apache.org/jira/browse/KAFKA-10375
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
> Attachments: stacktrace.txt
>
>
> After upgrading to 2.6, we started getting "SSL handshake fail" exceptions. 
> Curios thing is that it seems to affect only restore consumers. For mTLS, we 
> use dynamic certificates that are being reloaded automatically every X 
> minutes.
> We didn't have any issues with it, up until upgrading 2.6 and other stream 
> processing jobs running Kafka 2.4 don't have similar problems.
> After restarting the Kafka Streams instance, issue goes away.
>  
> From the stacktrace, it's visible that problem is:
> {code:java}
> Aug 07 10:36:12.478 | Caused by: 
> java.security.cert.CertificateExpiredException: NotAfter: Fri Aug 07 07:45:16 
> GMT 2020 
> {code}
> Seems like somehow restore consumer gets stuck with old certificate and it's 
> not refreshed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7421) Deadlock in Kafka Connect

2020-08-28 Thread Goltseva Taisiia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17185891#comment-17185891
 ] 

Goltseva Taisiia edited comment on KAFKA-7421 at 8/28/20, 8:16 AM:
---

And I agree with [~ivanyu], I do not understand why *{{PluginClassLoader}}'s 
loadClass* method is marked as synchronized, because  it has fine-grain 
synchronization immediately inside. And as I can see, this method do not use 
any fields of *{{PluginClassLoader}}* class.

[~kkonstantine] what do you think about it? Can we propose a PR to remove 
synchronized from *{{PluginClassLoader}}'s loadClass* method?


was (Author: xakassi):
And I agree with [~ivanyu], I do not understand why *{{PluginClassLoader}}'s 
loadClass* method is marked as synchronized, because  it has fine-grain 
synchronization immediately inside.

[~kkonstantine] what do you think about it?

> Deadlock in Kafka Connect
> -
>
> Key: KAFKA-7421
> URL: https://issues.apache.org/jira/browse/KAFKA-7421
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Maciej Bryński
>Assignee: Konstantine Karantasis
>Priority: Major
>
> I'm getting this deadlock on half of Kafka Connect runs when having two 
> different types connectors (in this configuration it's debezium and hdfs).
> Thread 1:
> {code}
> "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Thread 2:
> {code}
> "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>blocks pool-22-thread-2@4748
>waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:406)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
> - locked <0x1424> (a java.lang.Object)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> - locked <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at 
> io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(

[GitHub] [kafka] khaireddine120 commented on pull request #9207: Minor remove semicolon

2020-08-28 Thread GitBox


khaireddine120 commented on pull request #9207:
URL: https://github.com/apache/kafka/pull/9207#issuecomment-682442638


   Hi @bbejeck 
   Thanks for the approval
   Why the check is taking so long ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-08-28 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186568#comment-17186568
 ] 

John Roesler commented on KAFKA-10307:
--

Hey [~feyman] ,

It sounds like you're saying that we do in fact compute the correct result. I 
just want to be clear about that.

I've looked at the code, and while it's very complex it looks like your 
interpretation is accurate, and it is by design.

That said, if you want to take a stab at simplifying it, by all means do so!

However, since it sounds like your refactoring is in the scope of 
https://issues.apache.org/jira/browse/KAFKA-9377, and it seems like there's no 
evidence of a bug in FK join, so I'll go ahead and close this ticket as invalid.

Thanks for digging in and checking it!

-John

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
> Attachments: repartition_calc.jpg
>
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-SINK-18
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0

[jira] [Resolved] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-08-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10307.
--
Resolution: Not A Bug

This bug report was based on an invalid assumption that topologies wouldn't 
contain cycles. There was a secondary question in the comments about whether we 
computed the correct repartition topic partition counts for such topologies, 
and investigation reveals that we do. So I'm closing this ticket as "not a bug".

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
> Attachments: repartition_calc.jpg
>
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-SINK-18
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-SINK-18
>   <-- KTABLE-SOURCE-05
> Sink: KTABLE-SINK-18 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic)
>   

[jira] [Comment Edited] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-08-28 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186569#comment-17186569
 ] 

John Roesler edited comment on KAFKA-10307 at 8/28/20, 2:28 PM:


Resolution note:

This bug report was based on an invalid assumption that topologies wouldn't 
contain cycles. There was a secondary question in the comments about whether we 
computed the correct repartition topic partition counts for such topologies, 
and investigation reveals that we do. So I'm closing this ticket as "not a bug".


was (Author: vvcephei):
This bug report was based on an invalid assumption that topologies wouldn't 
contain cycles. There was a secondary question in the comments about whether we 
computed the correct repartition topic partition counts for such topologies, 
and investigation reveals that we do. So I'm closing this ticket as "not a bug".

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
> Attachments: repartition_calc.jpg
>
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [

[GitHub] [kafka] vvcephei commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-28 Thread GitBox


vvcephei commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r479344784



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new TimestampedTupl

[GitHub] [kafka] vvcephei commented on a change in pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-28 Thread GitBox


vvcephei commented on a change in pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#discussion_r479356312



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
##
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. 
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.serialization
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.UUID
+
+import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, 
Serdes => JSerdes}
+import org.apache.kafka.streams.kstream.WindowedSerdes
+
+object Serdes extends LowPrioritySerdes {
+  implicit def stringSerde: Serde[String] = JSerdes.String()
+  implicit def longSerde: Serde[Long] = 
JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit def javaLongSerde: Serde[java.lang.Long] = JSerdes.Long()
+  implicit def byteArraySerde: Serde[Array[Byte]] = JSerdes.ByteArray()
+  implicit def bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer()
+  implicit def shortSerde: Serde[Short] = 
JSerdes.Short().asInstanceOf[Serde[Short]]
+  implicit def javaShortSerde: Serde[java.lang.Short] = JSerdes.Short()
+  implicit def floatSerde: Serde[Float] = 
JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit def javaFloatSerde: Serde[java.lang.Float] = JSerdes.Float()
+  implicit def doubleSerde: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit def javaDoubleSerde: Serde[java.lang.Double] = JSerdes.Double()
+  implicit def intSerde: Serde[Int] = 
JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit def javaIntegerSerde: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()
+
+  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
+new WindowedSerdes.TimeWindowedSerde[T](tSerde)

Review comment:
   Thanks @LMnet ,
   
   I agree with you about option 2, it seems unlikely to actually work 
automatically and very likely to result in confusing implicit resolution 
conflict compiler errors.
   
   (1) and (3) are both fine with me. (3) seems mildly unnecessary, since it 
would just pass through directly to the constructor of the TimeWindowedSerde. 
Just to be clear, though, I would _not_ recommend :
   
   ```scala
   def timeWindowedSerde[T](tSerde: Serde[T], properties: Properties): 
WindowedSerdes.TimeWindowedSerde[T]
   ```
   
   Rather, I'd suggest:
   ```scala
   def timeWindowedSerde[T](tSerde: Serde[T], windowSize: Long): 
WindowedSerdes.TimeWindowedSerde[T] = new 
WindowedSerdes.TimeWindowedSerde[T](tSerde, windowSize)
   ```
   
   I.e., if you're explicitly constructing the serde anyway, you might as well 
explicitly specify its parameters. The config properties are really for the 
case when the serde is reflectively constructed by the Consumer.
   
   By the way, I agree about the idea of configuring serdes to be confusing. 
It's really to support more complex serdes that store their schemas externally 
AND that are constructed reflectively by Streams. For example, instead of 
passing the serdes into the DSL at all (which is what these implicits automate 
for Scala users), you might just set `default.value.serde` to a "json serde" or 
an "avro serde" or a "schema registry serde". These things need access to the 
runtime config, and the mechanism to give it to them is to reflectively 
construct them via the zero-arg constructor and then invoke `configure()` with 
the runtime configs. On the other hand, the serdes in _this_ file, like 
UUIDSerde or LongSerde, don't need any configuration; they already fully 
specify their schemas with no additional input. The TimeWindowedSerde does need 
some additional input (the inner type and the window size). When it's 
constructed reflectively, this information has to be passed to the serde via 
configs,
  but when it's constructed by user code, we can (and should) just pass all the 
information to the constructor.




--

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-28 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r479362665



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new TimestampedTupleFo

[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-28 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479383807



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -85,6 +85,8 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHWIncremented true if the high watermark is increased when 
appending record. Otherwise, false.
+ *this field is updated after appending record so 
it has default value option.

Review comment:
   The default value is None.

##
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##
@@ -36,8 +36,14 @@ private[group] class DelayedJoin(coordinator: 
GroupCoordinator,
  rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
 
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  override def onExpiration(): Unit = {
+coordinator.onExpireJoin()
+// try to complete delayed actions introduced by coordinator.onCompleteJoin
+tryToCompleteDelayedAction()
+  }
+  override def onComplete(): Unit = coordinator.onCompleteJoin(group)
+
+  protected def tryToCompleteDelayedAction(): Unit = 
coordinator.groupManager.replicaManager.tryCompleteDelayedAction()

Review comment:
   This can just be private.

##
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##
@@ -110,31 +109,21 @@ abstract class DelayedOperation(override val delayMs: 
Long,
* of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
* every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
* the operation is actually completed.
+   *

Review comment:
   The above comment is outdated now.

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -558,10 +558,27 @@ class ReplicaManager(val config: KafkaConfig,
 localLog(topicPartition).map(_.parentDir)
   }
 
+  // visible for testing
+  val delayedActions = new LinkedBlockingQueue[() => Unit]()
+
+  /**
+   * try to complete delayed action. In order to avoid conflicting locking, 
the actions to complete delayed requests
+   * are kept in a queue. We add the logic to check the ReplicaManager queue 
at the end of KafkaApis.handle(),
+   * at which point, no conflicting locks will be held.
+   */
+  def tryCompleteDelayedAction(): Unit = {
+val action = delayedActions.poll()
+if (action != null) action()
+  }
+
   /**
* Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
* the callback function will be triggered either when timeout or the 
required acks are satisfied;
* if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * Noted that all pending delayed check operations in a queue. All callers 
to ReplicaManager.appendRecords() are

Review comment:
   in a queue => are stored in a queue

##
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##
@@ -110,31 +109,21 @@ abstract class DelayedOperation(override val delayMs: 
Long,
* of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
* every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
* the operation is actually completed.
+   *
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() picks up and then execute an action when no lock is 
held.

Review comment:
   KafkaApis.handle() => KafkaApis.handle() and the expiration thread for 
certain delayed operations (e.g. DelayedJoin)

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -558,10 +5

[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-28 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r479411830



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
   Would the window's max timestamp always be larger than the previous 
window's by nature of moving forward in time? I think it would be somewhat more 
convenient but I feel like adding additional boolean checks makes the algorithm 
feel longer, especially if the issue will get handled in later checks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-28 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-682879578


   @junrao Thanks for all suggestions!
   
   Is Jenkins on vacation? Could you trigger a system test?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10444) Migrate PR jobs to new Apache Infra Jenkins

2020-08-28 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-10444:
---

Assignee: David Arthur

> Migrate PR jobs to new Apache Infra Jenkins
> ---
>
> Key: KAFKA-10444
> URL: https://issues.apache.org/jira/browse/KAFKA-10444
> Project: Kafka
>  Issue Type: Task
>Reporter: Ismael Juma
>Assignee: David Arthur
>Priority: Blocker
>
> The old Jenkins is disabled. A job in the new one is 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk11/]
> Apache Infra decided not to add the PR builder plugin we are using in the new 
> Jenkins, so we have to migrate to a new one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10444) Migrate PR jobs to new Apache Infra Jenkins

2020-08-28 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-10444:
---

 Summary: Migrate PR jobs to new Apache Infra Jenkins
 Key: KAFKA-10444
 URL: https://issues.apache.org/jira/browse/KAFKA-10444
 Project: Kafka
  Issue Type: Task
Reporter: Ismael Juma


The old Jenkins is disabled. A job in the new one is 
[https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk11/]

Apache Infra decided not to add the PR builder plugin we are using in the new 
Jenkins, so we have to migrate to a new one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-28 Thread GitBox


ijuma commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-682991870


   @chia7712 See KAFKA-10444 with regards to Jenkins.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-28 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-10434:
-
Labels: kip-required  (was: )

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kip-required
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10445) Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore

2020-08-28 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-10445:


 Summary: Align IQ SessionStore API with Instant-based methods as 
ReadOnlyWindowStore
 Key: KAFKA-10445
 URL: https://issues.apache.org/jira/browse/KAFKA-10445
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Jorge Esteban Quilcate Otoya






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10445) Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore

2020-08-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10445:

Labels: needs-kip  (was: )

> Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore
> ---
>
> Key: KAFKA-10445
> URL: https://issues.apache.org/jira/browse/KAFKA-10445
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: needs-kip
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10445) Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore

2020-08-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10445:

Labels: beginner needs-kip newbie  (was: needs-kip)

> Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore
> ---
>
> Key: KAFKA-10445
> URL: https://issues.apache.org/jira/browse/KAFKA-10445
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-28 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479467548



##
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##
@@ -100,41 +99,21 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
*
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() and the expiration thread for certain delayed 
operations (e.g. DelayedJoin)

Review comment:
   The last sentence doesn't complete.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-28 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-683052000


   @ijuma @hachikuji @rajinisivaram : Do you want to take another look at the 
latest solution from Chia-Ping? It (1) solves the known issues completely; (2) 
doesn't require new threads; (3) adds minimal changes to existing code; (4) 
simplifies existing code by removing the tryLock logic.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-28 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479474904



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -100,7 +102,8 @@ case class LogAppendInfo(var firstOffset: Option[Long],
  offsetsMonotonic: Boolean,
  lastOffsetOfFirstBatch: Long,
  recordErrors: Seq[RecordError] = List(),
- errorMessage: String = null) {
+ errorMessage: String = null,
+ leaderHWIncremented: Option[Boolean] = None) {

Review comment:
   Can we not use a `boolean` here? `false` until it's been incremented and 
then `true`. Is there value in having the third state?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-28 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479475681



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -184,6 +184,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   case e: FatalExitError => throw e
   case e: Throwable => handleError(request, e)
 } finally {
+  replicaManager.tryCompleteDelayedAction()

Review comment:
   Should we be guarding against exceptions here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr closed pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-28 Thread GitBox


dielhennr closed pull request #9101:
URL: https://github.com/apache/kafka/pull/9101


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-28 Thread GitBox


ijuma commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-683075891


   This looks promising. One question, do we want every request to drain this 
ReplicaManager queue or only the callers of `appendRecords`? I think this 
answer affects the design a bit.
   
   If it's meant to be called by every request, then maybe we should have the 
delayed actions in a separate class instead of ReplicaManager. Other classes 
could, in theory, add their own delayed actions to this queue too.
   
   If it's meant to be called after calling `appendRecords`, then it may be 
cleaner to add the call within the method that calls `appendRecords` (with 
maybe a helper method in `KafkaApis` to make it less error prone).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-28 Thread GitBox


dielhennr commented on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hi @jsancio I hope you're doing well. I added some work in progress to this 
branch that includes new APIs and basic functionality for this feature using 
the APIs. Fitting user and client-id into the `DescribeConfigs` API was awkward 
so I thought that the next best step would be to create a specialized set of 
APIs, similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 I'm wondering if I should create a new KIP and branch so that the old 
implementation can be referenced without digging into commit or page history. 
Do you have a preference? 
   
   I am also working on having the clients register the configs that they 
support, and I'm running into a few issues. I tried tying the registration to 
connectionId. This includes ip:port of the client as well as the broker. The 
issue here is that even if registration was tied to ip:port of a client, the 
client talks to the least loaded node when requesting configs. If the least 
loaded node is different than the last one the client talked to, the client 
will use a different port. This leads me to believe that tying supported config 
registration to the port of a client will not work. Would it be safe to assume 
that clients with the same ip address are all the same version? Do you have any 
suggestions for what identifier config registration should be tied to if this 
assumption cannot be made? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-28 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hi @jsancio I hope you're doing well. I added some work in progress to this 
branch that includes new APIs and basic functionality for this feature using 
the new APIs. Fitting user and client-id into the `DescribeConfigs` API was 
awkward so I thought that the next best step would be to create a specialized 
set of APIs, similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 I'm wondering if I should create a new KIP and branch so that the old 
implementation can be referenced without digging into commit or page history. 
Do you have a preference? 
   
   I am also working on having the clients register the configs that they 
support, and I'm running into a few issues. I tried tying the registration to 
connectionId. This includes ip:port of the client as well as the broker. The 
issue here is that even if registration was tied to ip:port of a client, the 
client talks to the least loaded node when requesting configs. If the least 
loaded node is different than the last one the client talked to, the client 
will use a different port. This leads me to believe that tying supported config 
registration to the port of a client will not work. Would it be safe to assume 
that clients with the same ip address are all the same version? Do you have any 
suggestions for what identifier config registration should be tied to if this 
assumption cannot be made? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479545936



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture 
dataFuture;
+
+/**
+ * Package-private constructor
+ *
+ * @param dataFuture the future indicating response data from the call
+ */
+
DescribeUserScramCredentialsResult(KafkaFuture
 dataFuture) {
+this.dataFuture = Objects.requireNonNull(dataFuture);
+}
+
+/**
+ *
+ * @return a future for the results of all described users with map keys 
(one per user) being consistent with the
+ * contents of the list returned by {@link #users()}. The future will 
complete successfully only if all such user
+ * descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+return KafkaFuture.allOf(dataFuture).thenApply(v -> {

Review comment:
   There's no reason to call `allOf` on a single future.  After all, 
`allOf`'s function is convert multiple futures to a single one.  But if you 
already have a single future, this is not needed.
   
   You could use `dataFuture.thenApply`, but that will not trigger if 
`dataFuture` is completed exceptionally.
   
   Instead, what you want here is something like the following:
   `dataFuture.whenComplete( (data, exception) -> if (exception != null) { ... 
error ... }  else { ... handle data ... }`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479546505



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture 
dataFuture;
+
+/**
+ * Package-private constructor
+ *
+ * @param dataFuture the future indicating response data from the call
+ */
+
DescribeUserScramCredentialsResult(KafkaFuture
 dataFuture) {
+this.dataFuture = Objects.requireNonNull(dataFuture);
+}
+
+/**
+ *
+ * @return a future for the results of all described users with map keys 
(one per user) being consistent with the
+ * contents of the list returned by {@link #users()}. The future will 
complete successfully only if all such user
+ * descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+return KafkaFuture.allOf(dataFuture).thenApply(v -> {
+DescribeUserScramCredentialsResponseData data = 
valueFromFutureGuaranteedToSucceedAtThisPoint(dataFuture);
+/* Check to make sure every individual described user succeeded.  
Note that a successfully described user
+ * is one that appears with *either* a NONE error code or a 
RESOURCE_NOT_FOUND error code. The
+ * RESOURCE_NOT_FOUND means the client explicitly requested a 
describe of that particular user but it could
+ * not be described because it does not exist; such a user will 
not appear as a key in the returned map.
+ */
+
Optional
 optionalFirstFailedDescribe =
+data.results().stream().filter(result ->
+result.errorCode() != Errors.NONE.code() && 
result.errorCode() != Errors.RESOURCE_NOT_FOUND.code()).findFirst();
+if (optionalFirstFailedDescribe.isPresent()) {
+throw 
Errors.forCode(optionalFirstFailedDescribe.get().errorCode()).exception(optionalFirstFailedDescribe.get().errorMessage());
+}
+Map retval = new 
HashMap<>();
+data.results().stream().forEach(userResult ->
+retval.put(userResult.user(), new 
UserScramCredentialsDescription(userResult.user(),
+getScramCredentialInfosFor(userResult;
+return retval;
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that meet the request 
criteria and that have at least one
+ * credential.  The future will not complete successfully if the user is 
not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period. Note 
that the returned list will not include users
+ * that do not exist/have no credentials: a request to describe an 
explicit list of users, none of which existed/had
+ * a credential, will result in a future that returns an empty list being 
returned here. A returned list will
+ * include users that have a credential but that could not be described.
+ */
+public KafkaFuture> users() {
+return KafkaFuture.allOf(dataFuture).thenApply(v -> {
+DescribeUserScramCredentialsRespon

[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479546505



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture 
dataFuture;
+
+/**
+ * Package-private constructor
+ *
+ * @param dataFuture the future indicating response data from the call
+ */
+
DescribeUserScramCredentialsResult(KafkaFuture
 dataFuture) {
+this.dataFuture = Objects.requireNonNull(dataFuture);
+}
+
+/**
+ *
+ * @return a future for the results of all described users with map keys 
(one per user) being consistent with the
+ * contents of the list returned by {@link #users()}. The future will 
complete successfully only if all such user
+ * descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+return KafkaFuture.allOf(dataFuture).thenApply(v -> {
+DescribeUserScramCredentialsResponseData data = 
valueFromFutureGuaranteedToSucceedAtThisPoint(dataFuture);
+/* Check to make sure every individual described user succeeded.  
Note that a successfully described user
+ * is one that appears with *either* a NONE error code or a 
RESOURCE_NOT_FOUND error code. The
+ * RESOURCE_NOT_FOUND means the client explicitly requested a 
describe of that particular user but it could
+ * not be described because it does not exist; such a user will 
not appear as a key in the returned map.
+ */
+
Optional
 optionalFirstFailedDescribe =
+data.results().stream().filter(result ->
+result.errorCode() != Errors.NONE.code() && 
result.errorCode() != Errors.RESOURCE_NOT_FOUND.code()).findFirst();
+if (optionalFirstFailedDescribe.isPresent()) {
+throw 
Errors.forCode(optionalFirstFailedDescribe.get().errorCode()).exception(optionalFirstFailedDescribe.get().errorMessage());
+}
+Map retval = new 
HashMap<>();
+data.results().stream().forEach(userResult ->
+retval.put(userResult.user(), new 
UserScramCredentialsDescription(userResult.user(),
+getScramCredentialInfosFor(userResult;
+return retval;
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that meet the request 
criteria and that have at least one
+ * credential.  The future will not complete successfully if the user is 
not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period. Note 
that the returned list will not include users
+ * that do not exist/have no credentials: a request to describe an 
explicit list of users, none of which existed/had
+ * a credential, will result in a future that returns an empty list being 
returned here. A returned list will
+ * include users that have a credential but that could not be described.
+ */
+public KafkaFuture> users() {
+return KafkaFuture.allOf(dataFuture).thenApply(v -> {
+DescribeUserScramCredentialsRespon

[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479548121



##
File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
##
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import java.util

Review comment:
   This should go with the other java imports, which I guess are combined 
in this file





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479548642



##
File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
##
@@ -166,14 +168,11 @@ class ClientQuotasRequestTest extends BaseRequestTest {
 
   @Test
   def testClientQuotasForScramUsers(): Unit = {
-val entityType = ConfigType.User
 val userName = "user"
 
-val mechanism = ScramMechanism.SCRAM_SHA_256
-val credential = new 
ScramFormatter(mechanism).generateCredential("password", 4096)
-val configs = adminZkClient.fetchEntityConfig(entityType, userName)
-configs.setProperty(mechanism.mechanismName, 
ScramCredentialUtils.credentialToString(credential))
-adminZkClient.changeConfigs(entityType, userName, configs)
+val results = 
createAdminClient().alterUserScramCredentials(util.Arrays.asList(

Review comment:
   You need to close the admin client after creating it.  I don't think 
Scala has try-with-resources, so you can just use a `try`... `finally` block.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479551061



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>
+  exitStatus = Some(status)
+  throw new RuntimeException
+}
+try {
+  Console.withOut(printStream) {
+ConfigCommand.main(Array("--bootstrap-server", brokerList) ++ args)
+  }
+  ConfigCommandResult(byteArrayOutputStream.toString(utf8))
+} catch {
+  case e: Exception => {

Review comment:
   is it useful to log the exception here at debug level?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479552810



##
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##
@@ -1047,8 +1047,8 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
   @Test
   def testAddRemoveSaslListeners(): Unit = {
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
+createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
+createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
 initializeKerberos()

Review comment:
   Is it necessary to wait for the change to be applied on all brokers 
after completing the admin client call?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479553827



##
File path: 
core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
##
@@ -42,7 +42,18 @@ class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
   override def setUp(): Unit = {
 super.setUp()
 // Create client credentials after starting brokers so that dynamic 
credential creation is also tested
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
+createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
+createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
+  }
+
+  private def createScramCredentialWithScramAdminClient(user: String, 
password: String) = {

Review comment:
   So the reason for not using the `SaslSertup#createScramCredentials` 
method here is because we want the admin client itself to be authenticated with 
SCRAM?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479554324



##
File path: 
core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
##
@@ -248,4 +250,25 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
 producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
 createProducer()
   }
+
+  private def createScramAdminClient(user: String, password: String): Admin = {

Review comment:
   It feels like we are accumulating a lot of these "create an admin 
client, but with SCRAM" functions.  Since all these tests ultimately subclass 
SaslSetup, can't we have a common function there?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-28 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r479554911



##
File path: 
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
##
@@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 }
   }
 
+  protected def createScramAdminClient(scramMechanism: String, user: String, 
password: String): Admin = {

Review comment:
   can we create a common function in SaslSetup for this?  Seems to be 
repeated in a lot of tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org