[GitHub] [kafka] chia7712 commented on pull request #9365: KAFKA-10566: Fix erroneous config usage warnings

2020-11-04 Thread GitBox


chia7712 commented on pull request #9365:
URL: https://github.com/apache/kafka/pull/9365#issuecomment-721617530


   Is it similar to https://issues.apache.org/jira/browse/KAFKA-10090? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9547: KAFKA-9630; Migrate OffsetsForLeaderEpochRequest/Response to the auto-generated protocol

2020-11-04 Thread GitBox


chia7712 commented on a change in pull request #9547:
URL: https://github.com/apache/kafka/pull/9547#discussion_r517212012



##
File path: 
clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
##
@@ -32,13 +32,13 @@
 { "name": "Topics", "type": "[]OffsetForLeaderTopic", "versions": "0+",
   "about": "Each topic to get offsets for.", "fields": [
   { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",

Review comment:
   Previous "name" is ```topic``` 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java#L25)
 and the "name" in auto-generated protocol is ```name```. Does it break the 
compatibility? For example, a cluster mixed by auto-generated protocol and 
stale protocol. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8953: MINOR: re-enable EosBetaUpgradeIntegrationTest

2020-11-04 Thread GitBox


chia7712 commented on pull request #8953:
URL: https://github.com/apache/kafka/pull/8953#issuecomment-721626362


   ```EosBetaUpgradeIntegrationTest``` gets unstable recently. Maybe we should 
re-disable it again :(



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9365: KAFKA-10566: Fix erroneous config usage warnings

2020-11-04 Thread GitBox


tombentley commented on pull request #9365:
URL: https://github.com/apache/kafka/pull/9365#issuecomment-721626482


   @chia7712 I think there's overlap, but I think this one removes additional 
warnings because it also propagates usage tracking in `SslFactory`. (It also 
provides a non-public abstraction to as least make it easier to identify the 
places where we end up doing this usage tracking, which might be useful should 
we ever figure out a better way of doing this). 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-11-04 Thread GitBox


soarez commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-721637971


   > Took way to long to get this merged.
   
   My fault for screwing up several times. Thanks for not giving up on this and 
all the time dedicated to reviewing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs

2020-11-04 Thread GitBox


bbejeck commented on a change in pull request #9554:
URL: https://github.com/apache/kafka/pull/9554#discussion_r517339026



##
File path: docs/streams/developer-guide/testing.html
##
@@ -73,67 +71,55 @@ // Processor 
API
 Topology topology = new Topology();
 topology.addSource("sourceProcessor", "input-topic");
 topology.addProcessor("processor", ..., "sourceProcessor");
-topology.addSink("sinkProcessor", "result-topic", "processor");
+topology.addSink("sinkProcessor", "output-topic", "processor");
 // or
 // using DSL
 StreamsBuilder builder = new StreamsBuilder();
-builder.stream("input-topic").filter(...).to("result-topic");
+builder.stream("input-topic").filter(...).to("output-topic");

Review comment:
   good catch! Thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs

2020-11-04 Thread GitBox


bbejeck commented on a change in pull request #9554:
URL: https://github.com/apache/kafka/pull/9554#discussion_r517339602



##
File path: docs/streams/upgrade-guide.html
##
@@ -135,14 +134,12 @@ Streams API
 tasks to their new owners in the background. Check out https://cwiki.apache.org/confluence/x/0i4lBg";>KIP-441
 for full details, including several new configs for control over this 
new feature.
 
-
 
 New end-to-end latency metrics have been added. These task-level 
metrics will be logged at the INFO level and report the min and max end-to-end 
latency of a record at the beginning/source node(s)
 and end/terminal node(s) of a task. See https://cwiki.apache.org/confluence/x/gBkRCQ";>KIP-613 for more 
information.
 
-
 
-As of 2.6.0 Kafka Streams deprecates KStream.through() in 
favor of the new KStream.repartition() operator
+As of 2.6.0 Kafka Streams deprecates KStream.through() if 
favor of the new KStream.repartition() operator

Review comment:
   ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs

2020-11-04 Thread GitBox


bbejeck merged pull request #9554:
URL: https://github.com/apache/kafka/pull/9554


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs

2020-11-04 Thread GitBox


bbejeck commented on pull request #9554:
URL: https://github.com/apache/kafka/pull/9554#issuecomment-721732656


   Merged #9554 into trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs

2020-11-04 Thread GitBox


bbejeck commented on pull request #9554:
URL: https://github.com/apache/kafka/pull/9554#issuecomment-721734486


   cherry-picked to 2.7



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


dajac commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517317354



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {

Review comment:
   It seems that `verbose` is not used anymore. I think that we were 
dumping the full Produce request and Fetch response when `verbose` was `true`. 
Should we just remove it?

##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJso

[GitHub] [kafka] cadonna commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-04 Thread GitBox


cadonna commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517426911



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -145,7 +145,7 @@
 private final String clientId;
 private final Metrics metrics;
 private final StreamsConfig config;
-protected final StreamThread[] threads;
+protected final ArrayList threads;

Review comment:
   I would prefer to use `List` instead of `ArrayList` to be more generic.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 }
 
 // create the stream thread, global update thread, and cleanup thread
-threads = new StreamThread[numStreamThreads];
-
+threads = new ArrayList<>(numStreamThreads);

Review comment:
   I think, it is better to keep the default initial capacity of an 
`ArrayList`. Otherwise, the first time a stream thread is added, we immediately 
run into a memory allocation. Since we do not know how many stream thread we 
might expect, let's use the default.   
   We could also consider using a `LinkedList` since we never access by index 
in production code.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 stateDirectory,
 delegatingStateRestoreListener,
 i + 1);
-threadState.put(threads[i].getId(), threads[i].state());
-storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+threads.add(i, streamThread);
+threadState.put(threads.get(i).getId(), threads.get(i).state());
+storeProviders.add(new 
StreamThreadStateStoreProvider(threads.get(i)));
 }
 
 ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(Arrays.stream(threads).filter(thread -> 
thread.state().isAlive()).count()));
+Math.toIntExact(Arrays.stream(threads.toArray(new 
StreamThread[numStreamThreads])).filter(thread -> 
thread.state().isAlive()).count()));

Review comment:
   Please simplify to
   ```suggestion
   Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 stateDirectory,
 delegatingStateRestoreListener,
 i + 1);
-threadState.put(threads[i].getId(), threads[i].state());
-storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+threads.add(i, streamThread);
+threadState.put(threads.get(i).getId(), threads.get(i).state());
+storeProviders.add(new 
StreamThreadStateStoreProvider(threads.get(i)));

Review comment:
   You can simplify to
   ```suggestion
   threads.add(streamThread);
   threadState.put(streamThread.getId(), streamThread.state());
   storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage

2020-11-04 Thread GitBox


vamossagar12 commented on a change in pull request #9539:
URL: https://github.com/apache/kafka/pull/9539#discussion_r517462186



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
 .map(follower -> new Voter().setVoterId(follower))
 .collect(Collectors.toList());
 
+// Adding the leader to the voters as the protocol ensures that leader 
always votes for itself.
+voters.add(new Voter().setVoterId(state.election().leaderId()));

Review comment:
   Alright. Thanks @hachikuji , I will make the changes and update the PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-04 Thread GitBox


cadonna commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r517464968



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
##
@@ -19,7 +19,7 @@
 public final class StreamsAssignmentProtocolVersions {
 public static final int UNKNOWN = -1;
 public static final int EARLIEST_PROBEABLE_VERSION = 3;
-public static final int LATEST_SUPPORTED_VERSION = 8;
+public static final int LATEST_SUPPORTED_VERSION = 9;

Review comment:
   Could you please also add the needed changes to system test 
`streams_upgrade_test.py::StreamsUpgradeTest.test_version_probing_upgrade` to 
this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage

2020-11-04 Thread GitBox


vamossagar12 commented on a change in pull request #9539:
URL: https://github.com/apache/kafka/pull/9539#discussion_r517462186



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
 .map(follower -> new Voter().setVoterId(follower))
 .collect(Collectors.toList());
 
+// Adding the leader to the voters as the protocol ensures that leader 
always votes for itself.
+voters.add(new Voter().setVoterId(state.election().leaderId()));

Review comment:
   Alright. Thanks @hachikuji . So, I need to add voters + voted for the 
current Leader in the LeaderChange message and see how to pass the latter to 
LeaderState.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10679) AK site docs changes need to get ported to Kafka/docs

2020-11-04 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-10679:
---

Assignee: Bill Bejeck

> AK site docs changes need to get ported to Kafka/docs
> -
>
> Key: KAFKA-10679
> URL: https://issues.apache.org/jira/browse/KAFKA-10679
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Affects Versions: 2.7.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> During the update of the Apache Kafka website, changes made to the kafka-site 
> repo were not made to the kafka/docs directory.
> All the changes made need to get migrated to kafka/docs to keep the website 
> in sync. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r517481718



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +374,66 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads; 
{@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException @NotNull if 
streamsUncaughtExceptionHandler is null.

Review comment:
   I don't remember putting it there so it was probably a mistake





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r517485257



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,6 +1064,72 @@ private boolean close(final long timeoutMs) {
 }
 }
 
+private void closeToError() {
+if (!setState(State.ERROR)) {
+// if transition failed, it means it was either in PENDING_SHUTDOWN
+// or NOT_RUNNING already; just check that all threads have been 
stopped
+log.info("Can not close to error from state " + state());

Review comment:
   That works





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10681) MM2 translateOffsets returns wrong offsets

2020-11-04 Thread Carlo Bongiovanni (Jira)
Carlo Bongiovanni created KAFKA-10681:
-

 Summary: MM2 translateOffsets returns wrong offsets
 Key: KAFKA-10681
 URL: https://issues.apache.org/jira/browse/KAFKA-10681
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.5.0
 Environment: GKE, strimzi release
Reporter: Carlo Bongiovanni


Hi all,

we'd like to make use of the ability of MM2 to mirror checkpoints of consumer 
offsets, in order to have a graceful failover from an active cluster to a 
standby one.

For this reason we have created the following setup (FYI all done with strimzi 
on k8s):
 * an active kafka cluster 2.5.0 used by a few producers/consumers
 * a standby kafka cluster 2.5.0
 * MM2 is setup in one direction only to mirror from active to standby

We have let MM2 run for some time and we could verify that messages are 
effectively mirrored.

At this point we have started developing the tooling to create consumer groups 
in the consumer-offsets topic of the passive cluster, by reading the internal 
checkpoints topic.

The following is an extract of our code to read the translated offsets:


{code:java}
Map mm2Props = new HashMap<>();
 mm2Props.put(BOOTSTRAP_SERVERS_CONFIG, "bootstrap_servers");
 mm2Props.put("source.cluster.alias", "euwe");
 mm2Props.put(SASL_MECHANISM, "SCRAM-SHA-512");
 mm2Props.put(SASL_JAAS_CONFIG, 
"org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\"user\" password=\"password\";");
 mm2Props.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
 mm2Props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, 
"/usr/local/lib/jdk/lib/security/cacerts");
 mm2Props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "some-password");
Map translatedOffsets = RemoteClusterUtils
 .translateOffsets(mm2Props, (String) mm2Props.get("source.cluster.alias"), cgi,
 Duration.ofSeconds(60L));
{code}
 

Before persisting the translated offsets with 
{code:java}
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = kafkaClient
 .alterConsumerGroupOffsets(cgi, offsets);{code}
we filter them because we don't want to create consumer groups for all 
retrieved offsets.

During the filtering, we compare the values of the translated offset for each 
topic partition (as coming from the checkpoint topic), 
 with the respective current offset value for each topic partition (as mirrored 
from MM2).

While running this check we have verified that for some topics we get big 
difference between those values, while for other topics the update seems 
realistic.

For example, looking at a given target partition we see it has an offset of 100 
(after mirroring by mm2). 
 From the checkpoint topic for the same consumer group id, we receive offset 
200, and later 150.

The issues are that:
 * both consumer group id offsets exceed the real offset of the partition
 * the consumer group id offsets from checkpoint goes down over time, not up

We haven't been able to explain it, the wrong numbers are coming from the 
*RemoteClusterUtils.translateOffsets()* and we're wondering if this could be a 
misconfiguration on our side or a bug of MM2.

Thanks, best
 C.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517516580



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 }
 
 // create the stream thread, global update thread, and cleanup thread
-threads = new StreamThread[numStreamThreads];
-
+threads = new ArrayList<>(numStreamThreads);

Review comment:
   That is fair





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-04 Thread GitBox


wcarlson5 commented on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-721881768


   @cadonna Looks like all those changes made sense



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets

2020-11-04 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-7556:
-

Assignee: Justine Olshan

> KafkaConsumer.beginningOffsets does not return actual first offsets
> ---
>
> Key: KAFKA-7556
> URL: https://issues.apache.org/jira/browse/KAFKA-7556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Robert V
>Assignee: Justine Olshan
>Priority: Critical
>  Labels: documentation, usability
>
> h2. Description of the problem
> The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` 
> claims in its Javadoc documentation that it would 'Get the first offset for 
> the given partitions.'.
> I used it with a compacted topic, and it always returned offset 0 for all 
> partitions.
>  Not sure if using a compacted topic actually matters, but I'm enclosing this 
> information anyway.
> Given a Kafka topic with retention set, and old log files being removed as a 
> result of that, the effective start offset of those partitions move further; 
> it simply will be greater than offset 0.
> However, calling the `beginningOffsets` method always returns offset 0 as the 
> first offset.
> In contrast, when the method 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called 
> with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the 
> effective start offsets for each partitions.
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: 
> {code:java}
> {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, 
> test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, 
> test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, 
> test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, 
> test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, 
> test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, 
> test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, 
> test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, 
> test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, 
> test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, 
> test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, 
> test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, 
> test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, 
> test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, 
> test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, 
> test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, 
> test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, 
> test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, 
> test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, 
> test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, 
> test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, 
> test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, 
> test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, 
> test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, 
> test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0}
> {code}
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`:
> {code:java}
> {test.topic-87=(timestamp=1511264434285, offset=289), 
> test.topic-54=(timestamp=1511265134993, offset=45420), 
> test.topic-21=(timestamp=1511265534207, offset=63643), 
> test.topic-79=(timestamp=1511270338275, offset=380750), 
> test.topic-46=(timestamp=1511266883588, offset=266379), 
> test.topic-13=(timestamp=1511265900538, offset=98512), 
> test.topic-70=(timestamp=1511266972452, offset=118522), 
> test.topic-37=(timestamp=1511264396370, offset=763), 
> test.topic-12=(timestamp=1511265504886, offset=61108), 
> test.topic-95=(timestamp=1511289492800, offset=847647), 
> test.topic-62=(timestamp=1511265831298, offset=68299), 
> test.topic-29=(timestamp=1511278767417, offset=548361), 
> test.topic-4=(timestamp=1511269316679, offset=144855), 
> test.topic-88=(timestamp=1511265608468, offset=107831), 
> test.topic-55=(timestamp=1511267449288, offset=129241), 
> test.topic-22=(timestamp=1511283134114, offset=563095), 
> test.topic-80=(timestamp=1511277334877, offset=534859), 
> test.topic-47=(timestamp=1511265530689, offset=71608), 
> test.topic-14=(timestamp=1511266308829, offset=80962), 
> test.topic-71=(timestamp=1511265474740, offset=83607), 
> test.topic-38=(timestamp=1511268268259, offset=166460), 
> test.topic-5=(timestamp=1511276243850, offset=294307), 
> test.topic-96=(timesta

[GitHub] [kafka] vvcephei commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-04 Thread GitBox


vvcephei commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517539432



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 }
 
 // create the stream thread, global update thread, and cleanup thread
-threads = new StreamThread[numStreamThreads];
-
+threads = new LinkedList<>();

Review comment:
   Should this collection be threadsafe? (or are all accesses inside 
synchronized blocks anyway?)

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 stateDirectory,
 delegatingStateRestoreListener,
 i + 1);
-threadState.put(threads[i].getId(), threads[i].state());
-storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+threads.add(i, streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new 
StreamThreadStateStoreProvider(threads.get(i)));

Review comment:
   ```suggestion
   storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
   ```
   
   `get(i)` is also O(n) for a linked list. Again, this is admittedly a nit, 
since the list is small.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 stateDirectory,
 delegatingStateRestoreListener,
 i + 1);
-threadState.put(threads[i].getId(), threads[i].state());
-storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+threads.add(i, streamThread);

Review comment:
   A bit of a nitpick, but this operation is O(n) for LinkedList. Better to 
just `add(streamThread)` if you want to use LinkedList.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-04 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r517543638



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
##
@@ -19,7 +19,7 @@
 public final class StreamsAssignmentProtocolVersions {
 public static final int UNKNOWN = -1;
 public static final int EARLIEST_PROBEABLE_VERSION = 3;
-public static final int LATEST_SUPPORTED_VERSION = 8;
+public static final int LATEST_SUPPORTED_VERSION = 9;

Review comment:
   Can you also leave a comment here reminding us to fix the version 
probing system test whenever this protocol number is bumped? Since we 
apparently always forget





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8953: MINOR: re-enable EosBetaUpgradeIntegrationTest

2020-11-04 Thread GitBox


ableegoldman commented on pull request #8953:
URL: https://github.com/apache/kafka/pull/8953#issuecomment-721896700


   I believe @mjsax is planning to look into it after his current work, but it 
seems reasonable to disable it again until then



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niteshmor opened a new pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-04 Thread GitBox


niteshmor opened a new pull request #9556:
URL: https://github.com/apache/kafka/pull/9556


   Jetty 9.4.32 and before are affected by CVE-2020-27216. This vulnerability 
is fixed in Jetty 9.4.33, please see the jetty project security advisory for 
details: 
https://github.com/eclipse/jetty.project/security/advisories/GHSA-g3wg-6mcf-8jj6#advisory-comment-63053
   
   Unit tests and integration tests pass locally after the upgrade.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niteshmor commented on pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-04 Thread GitBox


niteshmor commented on pull request #9556:
URL: https://github.com/apache/kafka/pull/9556#issuecomment-721898016


   For applying this fix to older branches, this commit can be cherry-picked to 
   `2.7` branch. For branches `2.6` and earlier, `jersey` version also needs to 
be
   upgraded to `2.31` based on the following note in `2.5` and `2.6` branches
   that are on jetty `9.4.24`: 
https://github.com/apache/kafka/blob/2.6/gradle/dependencies.gradle#L71
   
   For validating that this change can be backported, I have prepared commits 
in 
   my own fork for older branches going back till 2.4 and ran the tests locally.
   To assist in applying the fix to older branches, these can serve as a 
reference
   for what the exact change is. Or I can create new pull requests to the older
   branches if that's easier.
   
   - For 2.7, unit/integration tests pass locally. Reference commit: 
https://github.com/niteshmor/kafka/commit/8b51bd18f4541d1edb7d423a01314f78f1988fe8
   - For 2.6, 2 tests failed locally, but I believe they are unrelated. The 
failed tests are:
 - `SslAdminIntegrationTest.testCreateTopicsResponseMetadataAndConfig`
 - `ReplicaManagerTest.testFencedErrorCausedByBecomeLeader`
 Reference commit: 
https://github.com/niteshmor/kafka/commit/a0d3dc0f2e490d04b324ad72c47cd1363cc9dd6c
   - For 2.5, unit/integration tests pass locally. Reference commit: 
https://github.com/niteshmor/kafka/commit/f8f68c334132dd293d96d5be1a1c3d307896cefa
   - For 2.4, unit/integration tests pass locally. Reference commit: 
https://github.com/niteshmor/kafka/commit/9c08944c3c793c7733e0971a68dcb580dd4c288f
  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niteshmor commented on pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-04 Thread GitBox


niteshmor commented on pull request #9556:
URL: https://github.com/apache/kafka/pull/9556#issuecomment-721900447


   cc @ijuma @kkonstantine @rhauch @C0urante 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks

2020-11-04 Thread navin (Jira)
navin created KAFKA-10682:
-

 Summary: Windows Kafka cluster not reachable via Azure data Bricks
 Key: KAFKA-10682
 URL: https://issues.apache.org/jira/browse/KAFKA-10682
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.6.0
Reporter: navin


We have windows Kafka cluster,
 * We enabled inbound and outbound for port 9092/9093
 * Topic return results on local windows cmd used
 ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning 
--bootstrap-server 10.53.56.140:9092
 * We trying to consume the topic from Azure data bricks
 ** df = spark \
 .readStream \
 .format("kafka") \
 .option("kafka.bootstrap.servers", "10.53.56.140:9092") \
 .option("subscribe", "SIP.SIP.SHIPMENT") \
 .option("minPartitions", "10") \
 .option("startingOffsets", "earliest") \
 .load()
#df.isStreaming() # Returns True for DataFrames that have streaming sources

df.printSchema()

 ** Display(df)

On using display command after before amount of time we got below error:

Lost connection to cluster. The notebook may have been detached or the cluster 
may have been terminated due to an error in the driver such as an 
OutOfMemoryError.

  What we see in Logs is below error

20/11/04 18:23:52 WARN NetworkClient: [Consumer 
clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
 
groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
 Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 
18:23:52 WARN NetworkClient: [Consumer 
clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
 
groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
 Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: 
null)java.net.UnknownHostException: Navin.us.corp.tim.com at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281) at 
java.net.InetAddress.getAllByName(InetAddress.java:1193) at 
java.net.InetAddress.getAllByName(InetAddress.java:1127) at 
kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) 
at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
 at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
 at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) 
at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReade

[jira] [Updated] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks

2020-11-04 Thread navin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

navin updated KAFKA-10682:
--
Description: 
We have windows Kafka cluster,
 * We enabled inbound and outbound for port 9092/9093
 * Topic return results on local windows cmd used
 ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning 
--bootstrap-server 10.53.56.140:9092
 * We trying to consume the topic from Azure data bricks
 ** Simple ping and telnet works fine and connects to underlying server 
 ** df = spark \
 .readStream \
 .format("kafka") \
 .option("kafka.bootstrap.servers", "10.53.56.140:9092") \
 .option("subscribe", "SIP.SIP.SHIPMENT") \
 .option("minPartitions", "10") \
 .option("startingOffsets", "earliest") \
 .load()
 #df.isStreaming() # Returns True for DataFrames that have streaming sources

df.printSchema()
 * 
 ** Display(df)

On using display command after before amount of time we got below error:

Lost connection to cluster. The notebook may have been detached or the cluster 
may have been terminated due to an error in the driver such as an 
OutOfMemoryError.

  What we see in Logs is below error

20/11/04 18:23:52 WARN NetworkClient: [Consumer 
clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
 
groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
 Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 
18:23:52 WARN NetworkClient: [Consumer 
clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
 
groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
 Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: 
null)java.net.UnknownHostException: Navin.us.corp.tim.com at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281) at 
java.net.InetAddress.getAllByName(InetAddress.java:1193) at 
java.net.InetAddress.getAllByName(InetAddress.java:1127) at 
kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) 
at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
 at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
 at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) 
at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:538)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetRe

[jira] [Updated] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks

2020-11-04 Thread navin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

navin updated KAFKA-10682:
--
Description: 
We have windows Kafka cluster,
 * We enabled inbound and outbound for port 9092/9093
 * Topic return results on local windows cmd used
 ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning 
--bootstrap-server 10.53.56.140:9092
 * We trying to consume the topic from Azure data bricks
 ** Simple ping and telnet works fine and connects to underlying server 
 *** %sh telnet 10.53.56.140 9092
 *** %sh ping 10.53.56.140
 ** df = spark \
 .readStream \
 .format("kafka") \
 .option("kafka.bootstrap.servers", "10.53.56.140:9092") \
 .option("subscribe", "SIP.SIP.SHIPMENT") \
 .option("minPartitions", "10") \
 .option("startingOffsets", "earliest") \
 .load()
 #df.isStreaming() # Returns True for DataFrames that have streaming sources

df.printSchema()
 * 
 ** Display(df)

On using display command after before amount of time we got below error:

Lost connection to cluster. The notebook may have been detached or the cluster 
may have been terminated due to an error in the driver such as an 
OutOfMemoryError.

  What we see in Logs is below error

20/11/04 18:23:52 WARN NetworkClient: [Consumer 
clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
 
groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
 Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 
18:23:52 WARN NetworkClient: [Consumer 
clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
 
groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
 Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: 
null)java.net.UnknownHostException: Navin.us.corp.tim.com at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281) at 
java.net.InetAddress.getAllByName(InetAddress.java:1193) at 
java.net.InetAddress.getAllByName(InetAddress.java:1127) at 
kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) 
at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
 at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
 at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) 
at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReade

[GitHub] [kafka] cmccabe opened a new pull request #9557: Kip 500 move legacy

2020-11-04 Thread GitBox


cmccabe opened a new pull request #9557:
URL: https://github.com/apache/kafka/pull/9557


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe closed pull request #9557: Kip 500 move legacy

2020-11-04 Thread GitBox


cmccabe closed pull request #9557:
URL: https://github.com/apache/kafka/pull/9557


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mikebin commented on pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner

2020-11-04 Thread GitBox


mikebin commented on pull request #8690:
URL: https://github.com/apache/kafka/pull/8690#issuecomment-721907104


   Looks like this is still pending completion? Just wanted to check on status 
of getting this merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks

2020-11-04 Thread navin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226342#comment-17226342
 ] 

navin commented on KAFKA-10682:
---

uncommented below command  in server-properties located at kafka\config; Issue 
got resolved

listeners = PLAINTEXT://10.53.56.140:9092

> Windows Kafka cluster not reachable via Azure data Bricks
> -
>
> Key: KAFKA-10682
> URL: https://issues.apache.org/jira/browse/KAFKA-10682
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.6.0
>Reporter: navin
>Priority: Minor
>
> We have windows Kafka cluster,
>  * We enabled inbound and outbound for port 9092/9093
>  * Topic return results on local windows cmd used
>  ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning 
> --bootstrap-server 10.53.56.140:9092
>  * We trying to consume the topic from Azure data bricks
>  ** Simple ping and telnet works fine and connects to underlying server 
>  *** %sh telnet 10.53.56.140 9092
>  *** %sh ping 10.53.56.140
>  ** df = spark \
>  .readStream \
>  .format("kafka") \
>  .option("kafka.bootstrap.servers", "10.53.56.140:9092") \
>  .option("subscribe", "SIP.SIP.SHIPMENT") \
>  .option("minPartitions", "10") \
>  .option("startingOffsets", "earliest") \
>  .load()
>  #df.isStreaming() # Returns True for DataFrames that have streaming sources
> df.printSchema()
>  * 
>  ** Display(df)
> On using display command after before amount of time we got below error:
> Lost connection to cluster. The notebook may have been detached or the 
> cluster may have been terminated due to an error in the driver such as an 
> OutOfMemoryError.
>   What we see in Logs is below error
> 20/11/04 18:23:52 WARN NetworkClient: [Consumer 
> clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
>  
> groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
>  Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: 
> null)20/11/04 18:23:52 WARN NetworkClient: [Consumer 
> clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
>  
> groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
>  Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: 
> null)java.net.UnknownHostException: Navin.us.corp.tim.com at 
> java.net.InetAddress.getAllByName0(InetAddress.java:1281) at 
> java.net.InetAddress.getAllByName(InetAddress.java:1193) at 
> java.net.InetAddress.getAllByName(InetAddress.java:1127) at 
> kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)
>  at 
> kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
>  at 
> kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
>  at 
> kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
>  at 
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scal

[jira] [Resolved] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks

2020-11-04 Thread navin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

navin resolved KAFKA-10682.
---
Resolution: Fixed

kafka\config

Add 

listeners = PLAINTEXT://10.53.56.140:9092

> Windows Kafka cluster not reachable via Azure data Bricks
> -
>
> Key: KAFKA-10682
> URL: https://issues.apache.org/jira/browse/KAFKA-10682
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.6.0
>Reporter: navin
>Priority: Minor
>
> We have windows Kafka cluster,
>  * We enabled inbound and outbound for port 9092/9093
>  * Topic return results on local windows cmd used
>  ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning 
> --bootstrap-server 10.53.56.140:9092
>  * We trying to consume the topic from Azure data bricks
>  ** Simple ping and telnet works fine and connects to underlying server 
>  *** %sh telnet 10.53.56.140 9092
>  *** %sh ping 10.53.56.140
>  ** df = spark \
>  .readStream \
>  .format("kafka") \
>  .option("kafka.bootstrap.servers", "10.53.56.140:9092") \
>  .option("subscribe", "SIP.SIP.SHIPMENT") \
>  .option("minPartitions", "10") \
>  .option("startingOffsets", "earliest") \
>  .load()
>  #df.isStreaming() # Returns True for DataFrames that have streaming sources
> df.printSchema()
>  * 
>  ** Display(df)
> On using display command after before amount of time we got below error:
> Lost connection to cluster. The notebook may have been detached or the 
> cluster may have been terminated due to an error in the driver such as an 
> OutOfMemoryError.
>   What we see in Logs is below error
> 20/11/04 18:23:52 WARN NetworkClient: [Consumer 
> clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
>  
> groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
>  Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: 
> null)20/11/04 18:23:52 WARN NetworkClient: [Consumer 
> clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
>  
> groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
>  Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: 
> null)java.net.UnknownHostException: Navin.us.corp.tim.com at 
> java.net.InetAddress.getAllByName0(InetAddress.java:1281) at 
> java.net.InetAddress.getAllByName(InetAddress.java:1193) at 
> java.net.InetAddress.getAllByName(InetAddress.java:1127) at 
> kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)
>  at 
> kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
>  at 
> kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
>  at 
> kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
>  at 
> kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
>  at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
>  at 
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540)
>  at 
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(Kafk

[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


lbradstreet commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517562348



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   @dajac if I understand correctly, the current generated JSON does not 
print the recordSet either. Is it breaking anything to return the size rather 
than an empty array?
   ```
   }
   if (_object.recordSet == null) {
   _node.set("recordSet", NullNode.instance);
   } else {
   _node.set("recordSet", new BinaryNode(new byte[]{}));
   }
   ```
   
   That said, a verbose mode where we print out a byte array seems reasonable 
if we don't use it in the trace logging path by default?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


lbradstreet commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517562348



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   @dajac if I understand correctly, the current generated JSON does not 
print the recordSet either. Is it breaking anything to return the size rather 
than an empty array?
   ```
   }
   if (_object.recordSet == null) {
   _node.set("recordSet", NullNode.instance);
   } else {
   _node.set("recordSet", new BinaryNode(new byte[]{}));
   }
   ```
   
   That said, implementing a verbose mode (3) where we print out a byte array 
seems reasonable if we don't use it in the trace logging path by default?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r517576758



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -311,6 +317,22 @@ public void run() {
 "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",
 recoverableException
 );
+} catch (final Exception e) {
+if (this.streamsUncaughtExceptionHandler == null) {
+throw e;
+}
+if (Thread.getDefaultUncaughtExceptionHandler() != null && 
newHandler) {
+log.error("Stream's new uncaught exception handler is set as 
well as the deprecated old handler." +
+"The old handler will be ignored as long as a new 
handler is set.");

Review comment:
   I think it is simpler to check in the Stream thread because we don't in 
KafkaStreams if the handlers have been set so we would have to check the stream 
thread a global thread so it would be much easier to just check in the thread. 
I do agree that it should be bumped down to warn through.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9544: MINOR: Add back 2.6 notable update section taken out by mistake

2020-11-04 Thread GitBox


bbejeck merged pull request #9544:
URL: https://github.com/apache/kafka/pull/9544


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9544: MINOR: Add back 2.6 notable update section taken out by mistake

2020-11-04 Thread GitBox


bbejeck commented on pull request #9544:
URL: https://github.com/apache/kafka/pull/9544#issuecomment-721937375


   Merged #9544 into trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #7751: KAFKA-7987: Reinitialize ZookeeperClient after auth failures

2020-11-04 Thread GitBox


rajinisivaram commented on pull request #7751:
URL: https://github.com/apache/kafka/pull/7751#issuecomment-721940645


   Streams test failure not related, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #7751: KAFKA-7987: Reinitialize ZookeeperClient after auth failures

2020-11-04 Thread GitBox


rajinisivaram merged pull request #7751:
URL: https://github.com/apache/kafka/pull/7751


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram closed pull request #9550: [DO NOT MERGE] Temporary PR to track test failure in Jenkins build

2020-11-04 Thread GitBox


rajinisivaram closed pull request #9550:
URL: https://github.com/apache/kafka/pull/9550


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10683) Consumer.position() Ignores Transaction Marker with read_uncommitted

2020-11-04 Thread Gary Russell (Jira)
Gary Russell created KAFKA-10683:


 Summary: Consumer.position() Ignores Transaction Marker with 
read_uncommitted
 Key: KAFKA-10683
 URL: https://issues.apache.org/jira/browse/KAFKA-10683
 Project: Kafka
  Issue Type: Bug
  Components: clients, core
Affects Versions: 2.6.0
Reporter: Gary Russell


The workaround for https://issues.apache.org/jira/browse/KAFKA-6607# Says:

{quote}
or use `consumer.position()` that takes the commit marker into account and 
would "step over it")
{quote}

Note that this problem occurs with all consumers, not just Streams. We have 
implemented this solution in our project (as an option for those users 
concerned about the pseudo lag).

We have discovered that this technique will only work with 
{code}isolation.level=read_committed{code} Otherwise, the 
{code}position(){code} call does not include the marker "record".

https://github.com/spring-projects/spring-kafka/issues/1587#issuecomment-721899560



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2020-11-04 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7987.
---
Fix Version/s: 2.8.0
 Reviewer: Jun Rao
   Resolution: Fixed

> a broker's ZK session may die on transient auth failure
> ---
>
> Key: KAFKA-7987
> URL: https://issues.apache.org/jira/browse/KAFKA-7987
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Critical
> Fix For: 2.8.0
>
>
> After a transient network issue, we saw the following log in a broker.
> {code:java}
> [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Server not found in Kerberos database (7))]) occurred when 
> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client 
> will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
> [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> {code}
> The network issue prevented the broker from communicating to ZK. The broker's 
> ZK session then expired, but the broker didn't know that yet since it 
> couldn't establish a connection to ZK. When the network was back, the broker 
> tried to establish a connection to ZK, but failed due to auth failure (likely 
> due to a transient KDC issue). The current logic just ignores the auth 
> failure without trying to create a new ZK session. Then the broker will be 
> permanently in a state that it's alive, but not registered in ZK.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r517621757



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -294,7 +304,10 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
   final long cacheSizeBytes,
   final StateDirectory stateDirectory,
   final StateRestoreListener 
userStateRestoreListener,
-  final int threadIdx) {
+  final int threadIdx,
+  final ShutdownErrorHook 
shutdownErrorHook,
+  final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler,
+  final AtomicInteger assignmentErrorCode) 
{

Review comment:
   You are right it seems that it is not necessary 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r517627843



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
##
@@ -19,7 +19,7 @@
 public final class StreamsAssignmentProtocolVersions {
 public static final int UNKNOWN = -1;
 public static final int EARLIEST_PROBEABLE_VERSION = 3;
-public static final int LATEST_SUPPORTED_VERSION = 8;
+public static final int LATEST_SUPPORTED_VERSION = 9;

Review comment:
   thanks for the reminder. I think I I under stood the test ad 
incrementing to the next version, as the version is now 9





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517629961



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 }
 
 // create the stream thread, global update thread, and cleanup thread
-threads = new StreamThread[numStreamThreads];
-
+threads = new LinkedList<>();

Review comment:
   They are not all in synchronized blocks so I think it probably should be 
thread safe





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517630194



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 stateDirectory,
 delegatingStateRestoreListener,
 i + 1);
-threadState.put(threads[i].getId(), threads[i].state());
-storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+threads.add(i, streamThread);

Review comment:
   yep, should have changed that when I moved from ArrayList





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-04 Thread GitBox


wcarlson5 commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517630320



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 stateDirectory,
 delegatingStateRestoreListener,
 i + 1);
-threadState.put(threads[i].getId(), threads[i].state());
-storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+threads.add(i, streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new 
StreamThreadStateStoreProvider(threads.get(i)));

Review comment:
   Good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-04 Thread GitBox


junrao commented on pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#issuecomment-721987780


   @chia7712 : Thanks for the PR. 
   
   I ran the following command with the PR.
   bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config 
kafka.properties --create --topic test1
   
   kafka.properties 
   ```
   security.protocol=SSL
   ssl.protocol=TLS
   ```
   
   I still saw the WARN.
   
   `[2020-11-04 13:32:16,059] WARN The configuration 'ssl.protocol' was 
supplied but isn't a known config. 
(org.apache.kafka.clients.admin.AdminClientConfig)
   `



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517645718



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {

Review comment:
   Yes, I kept it because `requestDescMetrics` originally had the 
`detailsEnabled`, but I'll remove it since none of the JsonConverters 
differentiates between verbose and non-verbose.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517649219



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
+  case res: DescribeUserScramCredentialsRequest => 
DescribeUserScramCredentialsReq

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517651742



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {

Review comment:
   Just read through the comment below, so I'll keep it for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niteshmor edited a comment on pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-04 Thread GitBox


niteshmor edited a comment on pull request #9556:
URL: https://github.com/apache/kafka/pull/9556#issuecomment-721898016


   For applying this fix to older branches, this commit can be cherry-picked to 
   `2.7` branch. For branches `2.6` and earlier, `jersey` version also needs to 
be
   upgraded to `2.31` based on the following note in `2.5` and `2.6` branches
   that are on jetty `9.4.24`: 
https://github.com/apache/kafka/blob/2.6/gradle/dependencies.gradle#L71
   
   For validating that this change can be backported, I have prepared commits 
in 
   my own fork for older branches going back till 2.4 and ran the tests locally.
   Note that I only used Java 11 in my local testing. 
   To assist in applying the fix to older branches, these can serve as a 
reference
   for what the exact change is. Or I can create new pull requests to the older
   branches if that's easier.
   
   - For 2.7, unit/integration tests pass locally. Reference commit: 
https://github.com/niteshmor/kafka/commit/8b51bd18f4541d1edb7d423a01314f78f1988fe8
   - For 2.6, 2 tests failed locally, but I believe they are unrelated. The 
failed tests are:
 - `SslAdminIntegrationTest.testCreateTopicsResponseMetadataAndConfig`
 - `ReplicaManagerTest.testFencedErrorCausedByBecomeLeader`
 Reference commit: 
https://github.com/niteshmor/kafka/commit/a0d3dc0f2e490d04b324ad72c47cd1363cc9dd6c
   - For 2.5, unit/integration tests pass locally. Reference commit: 
https://github.com/niteshmor/kafka/commit/f8f68c334132dd293d96d5be1a1c3d307896cefa
   - For 2.4, unit/integration tests pass locally. Reference commit: 
https://github.com/niteshmor/kafka/commit/9c08944c3c793c7733e0971a68dcb580dd4c288f
  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niteshmor commented on pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-04 Thread GitBox


niteshmor commented on pull request #9556:
URL: https://github.com/apache/kafka/pull/9556#issuecomment-721998495


   Test failure seems to be unrelated. 
   
   `java.nio.file.DirectoryNotEmptyException` in 
`org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-11-04 Thread GitBox


abbccdda commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-722005317


   Mvn failure is not related, merging



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   @lbradstreet The current generated JSON does not print the recordSet. 
When we were serializing a BinaryNode with an empty array, it was still being 
deserialized as a BinaryNode, so it doesn't break anything.
   ```
   buffer.printf("%s;%n", target.assignmentStatement(
   
String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s,
 \"%s\")))",
   target.sourceVariable(), target.humanReadableName(;
   ```
   So I believe the concern is that I had proposed to change the serialization 
to an IntNode, but the deserialization still expects a BinaryNode which I 
should have also changed.
   Another alternative I could fix this by changing the deserialization to 
expect an IntNode, but I'm for adding a verbose flag
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-11-04 Thread GitBox


abbccdda merged pull request #9103:
URL: https://github.com/apache/kafka/pull/9103


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   @lbradstreet The current generated JSON does not print the recordSet 
either. But when we are serializing a BinaryNode with an empty array, it is 
still being deserialized as a BinaryNode, so it doesn't break anything.
   ```
   buffer.printf("%s;%n", target.assignmentStatement(
   
String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s,
 \"%s\")))",
   target.sourceVariable(), target.humanReadableName(;
   ```
   So I believe the concern is that I had proposed to change the serialization 
to an IntNode, but the deserialization still expects a BinaryNode which I 
should have also changed.
   Another alternative I could fix this by changing the deserialization to 
expect an IntNode, but I'm for adding a verbose flag
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   @lbradstreet The current generated JSON does not print the recordSet 
either. But when we are serializing a BinaryNode with an empty array, it is 
still being deserialized as a BinaryNode, so it doesn't break anything.
   ```
   buffer.printf("%s;%n", target.assignmentStatement(
   
String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s,
 \"%s\")))",
   target.sourceVariable(), target.humanReadableName(;
   ```
   So I believe the concern is that I had proposed to change the serialization 
to an IntNode, but the deserialization still expects a BinaryNode which I 
should have also changed.
   Another alternative I could fix this by changing the deserialization to 
expect an IntNode, but I'm for adding a verbose flag
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517651742



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {

Review comment:
   Just read through the comment below, so I'll keep the `verbose` flag for 
now, but I'll remove it depending on what we decide below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517672247



##
File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.snapshot
+
+import java.nio.ByteBuffer
+import java.nio.file.Path
+import java.util.{Iterator => JIterator}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.snapshot.SnapshotReader
+
+final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: 
OffsetAndEpoch) extends SnapshotReader {

Review comment:
   Sounds good. I'll move this code over and implement it in Java.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517699783



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   Nvm, we can't deserialize `recordSet` to an IntNode because 
`FetchResponseData` expects `recordSet` to be of type `BaseRecords`, not `int`.
   What I've done was left deserialization the same and added the verbose tag 
to serialization.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {

Review comment:
   This comment applies to some of your other observations. At high-level 
there are 4 use cases that we need to design and implement. Two use cases are 
for the Raft implementation. Two use cases are for the state machine.
   
   ### Raft implementation
   These types are internal to the raft implementation and don't have to be 
exposed to the state machine.
    Leader Use Case
   The leader needs to be able to send a part of the snapshot over the network. 
Something like this
   ```java
   interface SnapshotReader extends Closeable {
   long transferTo(long position, long maxBytes, WritableChannel channel);
   int read(ByteBuffer buffer, long position);
   }
   ```
    Follower Use Case
   The followers need to be able to copy bytes from the network and validate 
the snapshot on disk when fetching is done.
   ```java
   interface SnapshotWriter extends Closeable {
   void append(ByteBuffer buffer);
   void validate();
   void freeze();
   }
   ```
   ### State machine implementation
   These types are exposed to the state machine.
    Load Snapshot
   The state machine needs to be able to load/scan the entire snapshot. The 
state machine can use `close` to tell the raft client that it finished loading 
the snapshot. This will be implemented in a future PR but it could look like 
this:
   ```java
   interface BatchedSnapshotReader extends Iterable>, Closeable {
   }
   ```
    Generate Snapshot
   The state machine needs to be able to generate a snapshot by appending 
records/values and marking the snapshot as immutable (`freeze`) when it is done.
   ```java
   interface BatchdSnapshotWriter extends Closeable {
  void append(Iterable records);
  void freeze();
   }
   ```
   
   `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will 
have a real implementation and a mocked implementation for testing. These two 
types are internal to raft and are not exposed to the state machine.
   
   `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on 
`SnapshotWriter` and `SnapshotReade` to reuse some code but this is not 
strictly required. These two type don't have to be `interfaces` if they 
delegate the IO to `SnapshotWriter` and `SnapshotReader`.
   
   What do you think @hachikuji?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703625



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {
+
+public OffsetAndEpoch snapshotId();
+
+public long sizeInBytes();
+
+public Iterator iterator();
+
+public int read(ByteBuffer buffer, long position) throws IOException;

Review comment:
   I think I address this in this comment 
https://github.com/apache/kafka/pull/9512#discussion_r517703078. If you agree 
let's move the conversation there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703953



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/BatchedSnapshotWriter.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+
+// TODO: Write documentation for this type and all of the methods
+final public class BatchedSnapshotWriter implements Closeable {

Review comment:
   I cover some of the motivation for this here: 
https://github.com/apache/kafka/pull/9512#discussion_r517703078.
   
   Let's move the conversation there if you agree.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {

Review comment:
   This comment applies to some of your other observations. At high-level 
there are 4 use cases that we need to design and implement. Two use cases are 
for the Raft implementation. Two use cases are for the state machine.
   
   ### Raft implementation
   These types are internal to the raft implementation and don't have to be 
exposed to the state machine.
    Leader Use Case
   The leader needs to be able to send a part of the snapshot over the network. 
Something like this
   ```java
   interface SnapshotReader extends Closeable {
   long transferTo(long position, long maxBytes, WritableChannel channel);
   int read(ByteBuffer buffer, long position);
   }
   ```
    Follower Use Case
   The followers need to be able to copy bytes from the network and validate 
the snapshot on disk when fetching is done.
   ```java
   interface SnapshotWriter extends Closeable {
   void append(ByteBuffer buffer);
   void validate();
   void freeze();
   }
   ```
   ### State machine implementation
   These types are exposed to the state machine.
    Load Snapshot
   The state machine needs to be able to load/scan the entire snapshot. The 
state machine can use `close` to tell the raft client that it finished loading 
the snapshot. This will be implemented in a future PR but it could look like 
this:
   ```java
   interface BatchedSnapshotReader extends Iterable>, Closeable {
   }
   ```
    Generate Snapshot
   The state machine needs to be able to generate a snapshot by appending 
records/values and marking the snapshot as immutable (`freeze`) when it is done.
   ```java
   interface BatchdSnapshotWriter extends Closeable {
  void append(Iterable records);
  void freeze();
   }
   ```
   
   ### Notes
   
   `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will 
have a real implementation and a mocked implementation for testing. These two 
types are internal to raft and are not exposed to the state machine.
   
   `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on 
`SnapshotWriter` and `SnapshotReade` to reuse some code but this is not 
strictly required. These two type don't have to be `interfaces` if they 
delegate the IO to `SnapshotWriter` and `SnapshotReader`.
   
   What do you think @hachikuji?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9558: KAFKA-10342: migrate remaining RPCs to forwarding

2020-11-04 Thread GitBox


abbccdda opened a new pull request #9558:
URL: https://github.com/apache/kafka/pull/9558


   This PR will follow up 
https://github.com/apache/kafka/commit/0814e4f645880a3c63102fc197c8912c63846ad5 
to migrate the remaining RPCs which need forwarding:
   
   - CreateAcls
   - DeleteAcls
   - CreateDelegationToken
   - RenewDelegationToken
   - ExpireDelegationToken
   - AlterPartitionReassignment
   - CreatePartition
   - DeleteTopics
   - UpdateFeatures 
   - Scram
   
   We also refactored the `KafkaApisTest` to make it easier to add new unit 
tests for the forwarding path of new RPCs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9559: HOTFIX: RequestContext constructor change

2020-11-04 Thread GitBox


abbccdda opened a new pull request #9559:
URL: https://github.com/apache/kafka/pull/9559


   Hit an unfortunate merge conflict at the same time with 
https://github.com/apache/kafka/commit/5df8457e05f6808145e90f5637d7f8a4aed548d9
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10181) Create Envelope RPC and redirection template for configuration change RPCs

2020-11-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10181.
-
Resolution: Fixed

> Create Envelope RPC and redirection template for configuration change RPCs
> --
>
> Key: KAFKA-10181
> URL: https://issues.apache.org/jira/browse/KAFKA-10181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.8.0
>
>
> In the bridge release broker, 
> AlterConfig/IncrementalAlterConfig/CreateTopics/AlterClientQuota should be 
> redirected to the active controller. This ticket will ensure those RPCs get 
> redirected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda opened a new pull request #9560: KAFKA-10345: Add ZK-notification based update for trust/key store paths

2020-11-04 Thread GitBox


abbccdda opened a new pull request #9560:
URL: https://github.com/apache/kafka/pull/9560


   SSL trust store and key store paths update could no longer go through the 
direct per broker update due to forwarding. We need to add a mechanism to 
trigger the update through ZK notification.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10624) [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration

2020-11-04 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam reassigned KAFKA-10624:


Assignee: Kowshik Prakasam

> [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration
> 
>
> Key: KAFKA-10624
> URL: https://issues.apache.org/jira/browse/KAFKA-10624
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Minor
>
> In Scala, we prefer sealed traits over Enumeration since the former gives you 
> exhaustiveness checking. With Scala Enumeration, you don't get a warning if 
> you add a new value that is not handled in a given pattern match.
> This Jira tracks refactoring enum 
> [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801]
>  from an enum to a sealed trait. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik opened a new pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration

2020-11-04 Thread GitBox


kowshik opened a new pull request #9561:
URL: https://github.com/apache/kafka/pull/9561


   In this PR, I've switched the `FeatureZNodeStatus` enum to be a sealed 
trait. In Scala, we prefer sealed traits over Enumeration since the former 
gives you exhaustiveness checking. With Scala enumeration, you don't get a 
warning if you add a new value that is not handled in a given pattern match.
   
   **Test plan:**
   Rely on existing tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration

2020-11-04 Thread GitBox


kowshik commented on pull request #9561:
URL: https://github.com/apache/kafka/pull/9561#issuecomment-722097335


   @junrao @abbccdda: this PR is ready for review. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik opened a new pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow

2020-11-04 Thread GitBox


kowshik opened a new pull request #9562:
URL: https://github.com/apache/kafka/pull/9562


   Fixed the param doc in `FinalizedFeatureChangeListener.initOrThrow` method. 
The parameter `waitOnceForCacheUpdateMs` is expected to be > 0, but the doc was 
incorrect.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration

2020-11-04 Thread GitBox


kowshik commented on pull request #9561:
URL: https://github.com/apache/kafka/pull/9561#issuecomment-722113042


   The build in CI is broken due to https://github.com/apache/kafka/pull/9559 . 
I'll rebase on top of latest AK trunk, after that PR is merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow

2020-11-04 Thread GitBox


kowshik commented on pull request #9562:
URL: https://github.com/apache/kafka/pull/9562#issuecomment-722119275


   The build in CI is broken due to #9559 . I'll rebase on top of latest AK 
trunk, after that PR is merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow

2020-11-04 Thread GitBox


kowshik edited a comment on pull request #9562:
URL: https://github.com/apache/kafka/pull/9562#issuecomment-722119275


   The build in CI is broken due to absence of #9559. I'll rebase on top of 
latest AK trunk, after that PR is merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration

2020-11-04 Thread GitBox


kowshik edited a comment on pull request #9561:
URL: https://github.com/apache/kafka/pull/9561#issuecomment-722113042


   The build in CI is broken due to absence of 
https://github.com/apache/kafka/pull/9559. I'll rebase on top of latest AK 
trunk, after that PR is merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9559: HOTFIX: RequestContext constructor change

2020-11-04 Thread GitBox


chia7712 merged pull request #9559:
URL: https://github.com/apache/kafka/pull/9559


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow

2020-11-04 Thread GitBox


chia7712 commented on pull request #9562:
URL: https://github.com/apache/kafka/pull/9562#issuecomment-722124176


   @kowshik I have merged #9559. Could you rebase PR to trigger QA again?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-04 Thread GitBox


chia7712 commented on pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#issuecomment-722128511


   @junrao Thanks for your response. fixed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow

2020-11-04 Thread GitBox


kowshik commented on pull request #9562:
URL: https://github.com/apache/kafka/pull/9562#issuecomment-722130040


   @chia7712 I've rebased now on top of #9559. Hopefully the build will go 
green. Thank you for the review!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10684) Avoid additional copies in envelope path when transmitting over network

2020-11-04 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10684:
---

 Summary: Avoid additional copies in envelope path when 
transmitting over network 
 Key: KAFKA-10684
 URL: https://issues.apache.org/jira/browse/KAFKA-10684
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


When we send an envelope request or response, we first allocate a buffer for 
the embedded data. When we are ready to transmit the data, we allocate a new 
buffer for the full envelope and copy the embedded data to it. We can skip the 
second copy if we are a little smarter when translating the envelope data to 
the network `Send` object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji opened a new pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-04 Thread GitBox


hachikuji opened a new pull request #9563:
URL: https://github.com/apache/kafka/pull/9563


   This patch creates a new `SendBuilder` class which allows us to avoid 
copying "bytes" types when transmitting an api message over the network. This 
is used in `EnvelopeRequest` and `EnvelopeResponse` to avoid copying the 
embedded data.
   
   The patch also contains a few minor cleanups such as moving envelope parsing 
logic into `RequestContext`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups getting interpreted as a timezone

2020-11-04 Thread Russell Sayers (Jira)
Russell Sayers created KAFKA-10685:
--

 Summary: --to-datetime passed to kafka-consumer-groups getting 
interpreted as a timezone
 Key: KAFKA-10685
 URL: https://issues.apache.org/jira/browse/KAFKA-10685
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.0
Reporter: Russell Sayers


If you pass more than 3 decimal places for the fractional seconds of the 
datetime, the microseconds get interpreted as milliseconds.

{{kafka-consumer-groups --bootstrap-server kafka:9092 \}}
{{ --reset-offsets \}}
{{ --group webserver-avro \}}
{{ --topic driver-positions-avro \}}
{{ --to-datetime "2020-11-05T00:46:48.002237400" \}}
{{ --dry-run}}

Relevant code 
[here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].

Experimenting with getDateTime:
 * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
formatting string allows for ZZZ timezones
 * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends 
with 123 milliseconds.

The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
interprets "000123" as 123 milliseconds. See the stackoverflow answer 
[here|https://stackoverflow.com/a/21235602/109102].

The fix?  Remove any digits after more than 3 characters after the decimal 
point, or raise an exception. The code would still need to allow the RFC822 
timezone, i.e Sign TwoDigitHours Minutes.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong

2020-11-04 Thread Russell Sayers (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Sayers updated KAFKA-10685:
---
Summary: --to-datetime passed to kafka-consumer-groups interpreting 
microseconds wrong  (was: --to-datetime passed to kafka-consumer-groups getting 
interpreted as a timezone)

> --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong
> -
>
> Key: KAFKA-10685
> URL: https://issues.apache.org/jira/browse/KAFKA-10685
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0
>Reporter: Russell Sayers
>Priority: Minor
>
> If you pass more than 3 decimal places for the fractional seconds of the 
> datetime, the microseconds get interpreted as milliseconds.
> {{kafka-consumer-groups --bootstrap-server kafka:9092 \}}
> {{ --reset-offsets \}}
> {{ --group webserver-avro \}}
> {{ --topic driver-positions-avro \}}
> {{ --to-datetime "2020-11-05T00:46:48.002237400" \}}
> {{ --dry-run}}
> Relevant code 
> [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].
> Experimenting with getDateTime:
>  * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
>  * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
> formatting string allows for ZZZ timezones
>  * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this 
> ends with 123 milliseconds.
> The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
> interprets "000123" as 123 milliseconds. See the stackoverflow answer 
> [here|https://stackoverflow.com/a/21235602/109102].
> The fix?  Remove any digits after more than 3 characters after the decimal 
> point, or raise an exception. The code would still need to allow the RFC822 
> timezone, i.e Sign TwoDigitHours Minutes.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong

2020-11-04 Thread Russell Sayers (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Sayers updated KAFKA-10685:
---
Description: 
f you pass more than 3 decimal places for the fractional seconds of the 
datetime, the microseconds get interpreted as milliseconds.

{{kafka-consumer-groups --bootstrap-server kafka:9092 \}}
{{--reset-offsets \}}
{{--group webserver-avro \}}
{{--topic driver-positions-avro \}}
{{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}{{" \}}
{{--dry-run}}

Relevant code 
[here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].
 The datetime is being turned into Nov 5, 2020 1:24:05.400 because 
SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48.

Experimenting with getDateTime:
 * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
formatting string allows for ZZZ timezones
 * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends 
with 123 milliseconds.

The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
interprets "000123" as 123 milliseconds. See the stackoverflow answer 
[here|https://stackoverflow.com/a/21235602/109102].

The fix?  Remove any digits after more than 3 characters after the decimal 
point, or raise an exception. The code would still need to allow the RFC822 
timezone, i.e Sign TwoDigitHours Minutes.

 

  was:
If you pass more than 3 decimal places for the fractional seconds of the 
datetime, the microseconds get interpreted as milliseconds.

{{kafka-consumer-groups --bootstrap-server kafka:9092 \}}
{{ --reset-offsets \}}
{{ --group webserver-avro \}}
{{ --topic driver-positions-avro \}}
{{ --to-datetime "2020-11-05T00:46:48.002237400" \}}
{{ --dry-run}}

Relevant code 
[here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].

Experimenting with getDateTime:
 * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
formatting string allows for ZZZ timezones
 * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends 
with 123 milliseconds.

The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
interprets "000123" as 123 milliseconds. See the stackoverflow answer 
[here|https://stackoverflow.com/a/21235602/109102].

The fix?  Remove any digits after more than 3 characters after the decimal 
point, or raise an exception. The code would still need to allow the RFC822 
timezone, i.e Sign TwoDigitHours Minutes.

 


> --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong
> -
>
> Key: KAFKA-10685
> URL: https://issues.apache.org/jira/browse/KAFKA-10685
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0
>Reporter: Russell Sayers
>Priority: Minor
>
> f you pass more than 3 decimal places for the fractional seconds of the 
> datetime, the microseconds get interpreted as milliseconds.
> {{kafka-consumer-groups --bootstrap-server kafka:9092 \}}
> {{--reset-offsets \}}
> {{--group webserver-avro \}}
> {{--topic driver-positions-avro \}}
> {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}{{" \}}
> {{--dry-run}}
> Relevant code 
> [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].
>  The datetime is being turned into Nov 5, 2020 1:24:05.400 because 
> SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48.
> Experimenting with getDateTime:
>  * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
>  * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
> formatting string allows for ZZZ timezones
>  * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this 
> ends with 123 milliseconds.
> The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
> interprets "000123" as 123 milliseconds. See the stackoverflow answer 
> [here|https://stackoverflow.com/a/21235602/109102].
> The fix?  Remove any digits after more than 3 characters after the decimal 
> point, or raise an exception. The code would still need to allow the RFC822 
> timezone, i.e Sign TwoDigitHours Minutes.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong

2020-11-04 Thread Russell Sayers (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Sayers updated KAFKA-10685:
---
Description: 
If you pass more than 3 decimal places for the fractional seconds of the 
datetime, the microseconds get interpreted as milliseconds.

{{kafka-consumer-groups --bootstrap-server kafka:9092 }}
{{--reset-offsets }}
{{--group webserver-avro }}
{{--topic driver-positions-avro }}
{{ {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}" }}
{{ {{--dry-run

Relevant code 
[here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].
 The datetime is being turned into Nov 5, 2020 1:24:05.400 because 
SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48.

Experimenting with getDateTime:
 * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
formatting string allows for ZZZ timezones
 * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends 
with 123 milliseconds.

The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
interprets "000123" as 123 milliseconds. See the stackoverflow answer 
[here|https://stackoverflow.com/a/21235602/109102].

The fix?  Remove any digits after more than 3 characters after the decimal 
point, or raise an exception. The code would still need to allow the RFC822 
timezone, i.e Sign TwoDigitHours Minutes.

 

  was:
f you pass more than 3 decimal places for the fractional seconds of the 
datetime, the microseconds get interpreted as milliseconds.

{{kafka-consumer-groups --bootstrap-server kafka:9092 \}}
{{--reset-offsets \}}
{{--group webserver-avro \}}
{{--topic driver-positions-avro \}}
{{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}{{" \}}
{{--dry-run}}

Relevant code 
[here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].
 The datetime is being turned into Nov 5, 2020 1:24:05.400 because 
SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48.

Experimenting with getDateTime:
 * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
formatting string allows for ZZZ timezones
 * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends 
with 123 milliseconds.

The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
interprets "000123" as 123 milliseconds. See the stackoverflow answer 
[here|https://stackoverflow.com/a/21235602/109102].

The fix?  Remove any digits after more than 3 characters after the decimal 
point, or raise an exception. The code would still need to allow the RFC822 
timezone, i.e Sign TwoDigitHours Minutes.

 


> --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong
> -
>
> Key: KAFKA-10685
> URL: https://issues.apache.org/jira/browse/KAFKA-10685
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0
>Reporter: Russell Sayers
>Priority: Minor
>
> If you pass more than 3 decimal places for the fractional seconds of the 
> datetime, the microseconds get interpreted as milliseconds.
> {{kafka-consumer-groups --bootstrap-server kafka:9092 }}
> {{--reset-offsets }}
> {{--group webserver-avro }}
> {{--topic driver-positions-avro }}
> {{ {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}" }}
> {{ {{--dry-run
> Relevant code 
> [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].
>  The datetime is being turned into Nov 5, 2020 1:24:05.400 because 
> SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48.
> Experimenting with getDateTime:
>  * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
>  * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
> formatting string allows for ZZZ timezones
>  * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this 
> ends with 123 milliseconds.
> The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
> interprets "000123" as 123 milliseconds. See the stackoverflow answer 
> [here|https://stackoverflow.com/a/21235602/109102].
> The fix?  Remove any digits after more than 3 characters after the decimal 
> point, or raise an exception. The code would still need to allow the RFC822 
> timezone, i.e Sign TwoDigitHours Minutes.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-11-04 Thread Sarita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226507#comment-17226507
 ] 

Sarita commented on KAFKA-7500:
---

Was able to fix the bootstrap-server disconnect issue. Went through all logs 
and noticed that the sasl mechanism being used by 
producer/consumer/adminclinet/mirrormakerconnectors was not same. Had to 
manually set sasl mechanism for each of these configs to fix the issue. Below 
are the details of worker configs and connector configs.




Worker configs are as follows:

```
bootstrap.servers=abc-broker-1:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="abc-broker-superuser" 
password="abc-broker-superuser-password";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required username="abc-broker-superuser" 
password="abc-broker-superuser-password";


group.id=connect-tails

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false


internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-test


config.storage.topic=connect-configs-test

status.storage.topic=connect-status-test


producer.ssl.truststore.password=truststore_password
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
producer.ssl.keystore.password=keystore_password
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

```

 

 

 

 

Connector configs:
```
{
 "name": "MM9",
 "config": {
 "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
 "producer.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='xyz-broker-superuser' password='xyz-broker-superuser-password';",
 "errors.log.include.messages": "true",
 "target.cluster.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='abc-broker-superuser' password='abc-broker-superuser-password';",
 "sync.topic.acls.enabled": "false",
 "tasks.max": "3",
 "source.cluster.producer.security.protocol": "SASL_SSL",
 "emit.checkpoints.interval.seconds": "1",
 "source.cluster.alias": "xyz-broker",
 "target.cluster.producer.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='abc-broker-superuser' password='abc-broker-superuser-password';",
 "source.cluster.producer.sasl.mechanism": "PLAIN",
 "target.cluster.producer.bootstrap.servers": 
"abc-broker-3:9093,abc-broker-2:9093",
 "enabled": "true",
 "target.cluster.admin.bootstrap.servers": 
"abc-broker-3:9093,abc-broker-2:9093",
 "target.cluster.producer.security.protocol": "SASL_SSL",
 "target.cluster.security.protocol": "SASL_SSL",
 "target.cluster.consumer.sasl.mechanism": "PLAIN",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "errors.log.enable": "true",
 "source.cluster.admin.bootstrap.servers": 
"xyz-broker-1:9093,xyz-broker-2:9093",
 "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "clusters": "xyz-broker, abc-broker",
 "source.cluster.producer.bootstrap.servers": 
"xyz-broker-1:9093,xyz-broker-2:9093",
 "target.cluster.consumer.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='abc-broker-superuser' password='abc-broker-superuser-password';",
 "producer.security.protocol": "SASL_SSL",
 "topics": "messaging_ops_mm8",
 "source.cluster.producer.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='xyz-broker-superuser' password='xyz-broker-superuser-password';",
 "target.cluster.sasl.mechanism": "PLAIN",
 "source.cluster.consumer.security.protocol": "SASL_SSL",
 "groups": "consumer-group-.*",
 "source.cluster.consumer.sasl.mechanism": "PLAIN",
 "source.cluster.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='xyz-broker-superuser' password='xyz-broker-superuser-password';",
 "source.cluster.bootstrap.servers": "xyz-broker-1:9093,xyz-broker-2:9093",
 "source.cluster.sasl.mechanism": "PLAIN",
 "producer.sasl.mechanism": "PLAIN",
 "target.cluster.alias": "abc-broker",
 "target.cluster.consumer.security.protocol": "SASL_SSL",
 "task.class": "org.apache.kafka.connect.mirror.MirrorSourceTask",
 "target.cluster.consumer.bootstrap.servers": 
"abc-broker-3:9093,abc-broker-2:9093",
 "name": "MM9",
 "target.cluster.bootstrap.servers": "abc-broker-3:9093,abc-broker-2:9093",
 "emit.hear

[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-11-04 Thread Sarita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651
 ] 

Sarita edited comment on KAFKA-7500 at 11/5/20, 6:11 AM:
-

Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows

 {{{quote}}}

bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets-test
 config.storage.topic=connect-configs-test
 status.storage.topic=connect-status-test
 offset.flush.interval.ms=30
 producer.buffer.memory=1234
 producer.ssl.truststore.password=truststore_password
 
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
 producer.ssl.keystore.password=keystore_password
 
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

 

{{{quote}}}

When i run below command, I can see worker connector created:

 

{{{quote}}}
 ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties 
&``` 

 

{{{quote}}}

When I try to create a connector using POST call, I start seeing message as 

 

{{{quote}}}

```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker 
abc-broker:9093 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1037)``` 

 

{{{quote}}}

json content for POST call is as below

 

{{{quote}}}
 ```
 {{  "name": "MM9",
 "config":

{ "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", 
"tasks.max": 3, "topics": "messaging_ops_mm8", "errors.log.enable": true, 
"errors.log.include.messages": true, "key.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter", "clusters": 
"xyz-broker, abc-broker", "source.cluster.alias": "xyz-broker", 
"target.cluster.alias": "abc-broker", "source.cluster.bootstrap.servers": 
"xyz-broker:9093", "source.cluster.security.protocol": "SASL_SSL", 
"source.cluster.sasl.mechanism": "PLAIN", "source.cluster.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='superuser' password='superuser_password';", 
"source.cluster.ssl.truststore.password" : "truststore_password",   
"source.cluster.ssl.truststore.location" : 
"/opt/projects/confluent/wildcard.kafka.iggroup.local.jks",     
"source.cluster.ssl.keystore.password" : "keystore_password",       
"source.cluster.ssl.keystore.location" : 
"/opt/projects/confluent/ssl/wildcard.kafka.iggroup.local.jks",     
"target.cluster.bootstrap.servers": "abc-broker:9093",     
"target.cluster.security.protocol": "SASL_SSL", 
"target.cluster.sasl.mechanism": "PLAIN", "target.cluster.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='superuser' password='superuser_password';", 
"target.cluster.ssl.truststore.password" : "truststore_password",        
"target.cluster.ssl.truststore.location" : 
"/opt/projects/confluent/ssl/wildcard.kafka.iggroup.local.jks",     
"target.cluster.ssl.keystore.password" : "keystore_password",    
"target.cluster.ssl.keystore.location" : 
"/opt/projects/confluent/ssl/wildcard.kafka.iggroup.local.jks" }

}
 ```

 

{{{quote}}}

Note the bootstrap-server added in connect-distributed.properties file and the 
target cluster bootstrap-server are same(abc-broker).

I have added all the SASL credentials, did a swap of source and destination 
cluster in the json but I continue to get this broker disconnected WARNING. 

We are on kafka version 0.11.0.3. 

What is it that we are missing?


was (Author: saritago):
Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows
 ```
 bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apac

[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-11-04 Thread Sarita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651
 ] 

Sarita edited comment on KAFKA-7500 at 11/5/20, 6:12 AM:
-

Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows

 {{}}
{quote} {{_emphasis_}}

bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets-test
 config.storage.topic=connect-configs-test
 status.storage.topic=connect-status-test
 offset.flush.interval.ms=30
 producer.buffer.memory=1234
 producer.ssl.truststore.password=truststore_password
 
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
 producer.ssl.keystore.password=keystore_password
 
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

 
 
{{_emphasis_}}
{quote}
 

When i run below command, I can see worker connector created:

 

 
{quote}{{
{{ ```nohup sh connect-distributed 
../etc/kafka/connect-distributed-1.properties &``` }}

 

{{
{quote}
 

When I try to create a connector using POST call, I start seeing message as 

 

 
{quote}{{

```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker 
abc-broker:9093 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1037)``` 

 

{{
{quote}
 

json content for POST call is as below

 

 
{quote}{{
{{ ```}}
{{ {{  "name": "MM9",}}
{{ "config":}}
Unknown macro: \{ "connector.class"}
}
 ```

 

{{
{quote}
 

Note the bootstrap-server added in connect-distributed.properties file and the 
target cluster bootstrap-server are same(abc-broker).

I have added all the SASL credentials, did a swap of source and destination 
cluster in the json but I continue to get this broker disconnected WARNING. 

We are on kafka version 0.11.0.3. 

What is it that we are missing?


was (Author: saritago):
Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows

 {{{quote}}}

bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets-test
 config.storage.topic=connect-configs-test
 status.storage.topic=connect-status-test
 offset.flush.interval.ms=30
 producer.buffer.memory=1234
 producer.ssl.truststore.password=truststore_password
 
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
 producer.ssl.keystore.password=keystore_password
 
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

 

{{{quote}}}

When i run below command, I can see worker connector created:

 

{{{quote}}}
 ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties 
&``` 

 

{{{quote}}}

When I try to create a connector using POST call, I start seeing message as 

 

{{{quote}}}

```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker 
abc-broker:9093 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1037)``` 

 

{{{quote}}}

json content for POST call is as below

 

{{{quote}}}
 ```
 {{  "name": "MM9",
 "config":

{ "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", 
"tasks.max": 3, "topics": "messaging_ops_mm8", "errors.log.enable": true, 
"errors.log.include.messages": true, "key.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter", "clusters": 
"xyz-broker, abc-broker", "source.cluster.alias": "xyz-broker", 
"target.cluster.alias": "abc-broker", "source.cluster.bootstrap.servers": 
"xyz-broker:90

[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-11-04 Thread Sarita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651
 ] 

Sarita edited comment on KAFKA-7500 at 11/5/20, 6:13 AM:
-

Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows

 

 

{{{quote}}}

bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets-test
 config.storage.topic=connect-configs-test
 status.storage.topic=connect-status-test
 offset.flush.interval.ms=30
 producer.buffer.memory=1234
 producer.ssl.truststore.password=truststore_password
 
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
 producer.ssl.keystore.password=keystore_password
 
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

 

 

{{{quote}}}
  



 

When i run below command, I can see worker connector created:

 

 
{quote}{{
 \{{ ```nohup sh connect-distributed 
../etc/kafka/connect-distributed-1.properties &``` }}

 

{{
{quote}
 

When I try to create a connector using POST call, I start seeing message as 

 

 
{quote}{{

```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker 
abc-broker:9093 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1037)``` 

 

{{
{quote}
 

json content for POST call is as below

 

 
{quote}{{
 \{{ ```}}
 {{ {{  "name": "MM9",}}
 \{{ "config":}}
 Unknown macro: \{ "connector.class"}
 }
 ```

 

{{
{quote}
 

Note the bootstrap-server added in connect-distributed.properties file and the 
target cluster bootstrap-server are same(abc-broker).

I have added all the SASL credentials, did a swap of source and destination 
cluster in the json but I continue to get this broker disconnected WARNING. 

We are on kafka version 0.11.0.3. 

What is it that we are missing?


was (Author: saritago):
Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows

 {{}}
{quote} {{_emphasis_}}

bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets-test
 config.storage.topic=connect-configs-test
 status.storage.topic=connect-status-test
 offset.flush.interval.ms=30
 producer.buffer.memory=1234
 producer.ssl.truststore.password=truststore_password
 
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
 producer.ssl.keystore.password=keystore_password
 
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

 
 
{{_emphasis_}}
{quote}
 

When i run below command, I can see worker connector created:

 

 
{quote}{{
{{ ```nohup sh connect-distributed 
../etc/kafka/connect-distributed-1.properties &``` }}

 

{{
{quote}
 

When I try to create a connector using POST call, I start seeing message as 

 

 
{quote}{{

```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker 
abc-broker:9093 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1037)``` 

 

{{
{quote}
 

json content for POST call is as below

 

 
{quote}{{
{{ ```}}
{{ {{  "name": "MM9",}}
{{ "config":}}
Unknown macro: \{ "connector.class"}
}
 ```

 

{{
{quote}
 

Note the bootstrap-server added in connect-distributed.properties file and the 
target cluster bootstrap-server are same(abc-broker).

I have added all the SASL credentials, did a swap of source and destination 
cluster in the json but I continue to get this broker disconnected WARNING. 

We are on kafka version 0.11.0.3. 

What is it that we are missing?

> MirrorMaker 2.0 (KIP-382)
> ---

[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-11-04 Thread Sarita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651
 ] 

Sarita edited comment on KAFKA-7500 at 11/5/20, 6:13 AM:
-

Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows

 

 

 
{quote}quote

bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets-test
 config.storage.topic=connect-configs-test
 status.storage.topic=connect-status-test
 offset.flush.interval.ms=30
 producer.buffer.memory=1234
 producer.ssl.truststore.password=truststore_password
 
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
 producer.ssl.keystore.password=keystore_password
 
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

 

 

quote
{quote}

  

 

 

When i run below command, I can see worker connector created:

 

 
{quote}{{
 \{{ ```nohup sh connect-distributed 
../etc/kafka/connect-distributed-1.properties &``` }}

 

{{
{quote}
 

When I try to create a connector using POST call, I start seeing message as 

 

 
{quote}{{

```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker 
abc-broker:9093 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1037)``` 

 

{{
{quote}
 

json content for POST call is as below

 

 
{quote}{{
 \{{ ```}}
 {{ {{  "name": "MM9",}}
 \{{ "config":}}
 Unknown macro: \{ "connector.class"}
 }
 ```

 

{{
{quote}
 

Note the bootstrap-server added in connect-distributed.properties file and the 
target cluster bootstrap-server are same(abc-broker).

I have added all the SASL credentials, did a swap of source and destination 
cluster in the json but I continue to get this broker disconnected WARNING. 

We are on kafka version 0.11.0.3. 

What is it that we are missing?


was (Author: saritago):
Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows

 

 

{{{quote}}}

bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets-test
 config.storage.topic=connect-configs-test
 status.storage.topic=connect-status-test
 offset.flush.interval.ms=30
 producer.buffer.memory=1234
 producer.ssl.truststore.password=truststore_password
 
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
 producer.ssl.keystore.password=keystore_password
 
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

 

 

{{{quote}}}
  



 

When i run below command, I can see worker connector created:

 

 
{quote}{{
 \{{ ```nohup sh connect-distributed 
../etc/kafka/connect-distributed-1.properties &``` }}

 

{{
{quote}
 

When I try to create a connector using POST call, I start seeing message as 

 

 
{quote}{{

```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker 
abc-broker:9093 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1037)``` 

 

{{
{quote}
 

json content for POST call is as below

 

 
{quote}{{
 \{{ ```}}
 {{ {{  "name": "MM9",}}
 \{{ "config":}}
 Unknown macro: \{ "connector.class"}
 }
 ```

 

{{
{quote}
 

Note the bootstrap-server added in connect-distributed.properties file and the 
target cluster bootstrap-server are same(abc-broker).

I have added all the SASL credentials, did a swap of source and destination 
cluster in the json but I continue to get this broker disconnected WARNING. 

We are on kafka version 0.11.0.3. 

What is it that we are missing?

> MirrorMaker 2.0 (KIP-382)
> -

[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-11-04 Thread Sarita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651
 ] 

Sarita edited comment on KAFKA-7500 at 11/5/20, 6:24 AM:
-

Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setup. 

Worker configs are as follows

 

 

 
{quote}quote

bootstrap.servers=abc-broker:9093
 security.protocol=SASL_SSL
 sasl.mechanism=PLAIN
 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="superuser" password="superuser_password";
 group.id=connect-tails
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets-test
 config.storage.topic=connect-configs-test
 status.storage.topic=connect-status-test
 offset.flush.interval.ms=30
 producer.buffer.memory=1234
 producer.ssl.truststore.password=truststore_password
 
producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks
 producer.ssl.keystore.password=keystore_password
 
producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks

 

 

quote
{quote}
 

 

 

When i run below command, I can see worker connector created:

 

 
{quote}{{
 \{{ ```nohup sh connect-distributed 
../etc/kafka/connect-distributed-1.properties &``` }}

 

{{
{quote}
 

When I try to create a connector using POST call, I start seeing message as 

 

 
{quote}{{

```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker 
abc-broker:9093 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1037)``` 

 

{{
{quote}
 

json content for POST call is as below

 

 
{quote}{{  "name": "MM9",
 "config": { "connector.class": 
"org.apache.kafka.connect.mirror.MirrorSourceConnector", 
"tasks.max": 3,
 "topics": "messaging_ops_mm8",
 "errors.log.enable": true,
 "errors.log.include.messages": true,
 "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", 
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", 
"clusters": "peach, tails",
 "source.cluster.alias": "peach", 
"target.cluster.alias": "tails", 
"source.cluster.bootstrap.servers": "xyz-broker:9093", 
"source.cluster.producer.bootstrap.servers": "xyz-broker:9093",
 "sync.topic.acls.enabled": "false",
 "emit.checkpoints.interval.seconds": "1",
 "groups": "consumer-group-.*",
 "source.cluster.admin.bootstrap.servers": "xyz-broker:9093", 
"source.cluster.consumer.bootstrap.servers": "xyz-broker:9093", 
"source.cluster.security.protocol": "SASL_SSL", 
"source.cluster.sasl.mechanism": "PLAIN", 
"source.cluster.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='superuser' password='superuser-password';",
 "source.cluster.ssl.truststore.password" : "truststorepassword",        
"source.cluster.ssl.truststore.location" : 
"/opt/projects/confluent/wildcard.kafka.iggroup.local.jks",        
"source.cluster.ssl.keystore.password" : "keystorepassword",
  "source.cluster.ssl.keystore.location" : 
"/opt/projects/confluent/wildcard.kafka.iggroup.local.jks", 
"target.cluster.bootstrap.servers": "abc-broker:9093", 
"target.cluster.producer.bootstrap.servers": "abc-broker:9093", 
"enabled": "true",  
"target.cluster.admin.bootstrap.servers": "abc-broker:9093", 
"target.cluster.consumer.bootstrap.servers": "abc-broker:9093", 
"name": "MirrorSourceConnector", 
"emit.heartbeats.interval.seconds": "1",
 "target.cluster.security.protocol": "SASL_SSL",
 "target.cluster.sasl.mechanism": "PLAIN", 
"target.cluster.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='superuser' password='superuser-password';", 
"target.cluster.ssl.truststore.password" : "truststorepassword",        
"target.cluster.ssl.truststore.location" : 
"/opt/projects/confluent/wildcard.kafka.iggroup.local.jks",        
"target.cluster.ssl.keystore.password" : "keystore-password",        
"target.cluster.ssl.keystore.location" : 
"/opt/projects/confluent/wildcard.kafka.iggroup.local.jks" }}
{quote}
 

Note the bootstrap-server added in connect-distributed.properties file and the 
target cluster bootstrap-server are same(abc-broker).

I have added all the SASL credentials, did a swap of source and destination 
cluster in the json but I continue to get this broker disconnected WARNING. 

We are on kafka version 0.11.0.3. 

What is it that we are missing?


was (Author: saritago):
Hi [~ryannedolan]

We are trying to set up MM2 with connection distributed setu

[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-04 Thread GitBox


chia7712 commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r517822478



##
File path: clients/src/main/resources/common/message/EnvelopeRequest.json
##
@@ -23,7 +23,7 @@
   "fields": [
 { "name": "RequestData", "type": "bytes", "versions": "0+", "zeroCopy": 
true,
   "about": "The embedded request header and data."},
-{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", 
"zeroCopy": true, "nullableVersions": "0+",
+{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+",

Review comment:
   Is ```nullableVersions`` required? It seems there is no null 
handle/check in production for this field.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-04 Thread GitBox


chia7712 commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r517824324



##
File path: 
clients/src/main/java/org/apache/kafka/common/network/SendBuilder.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Writable;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class provides a way to build {@link Send} objects for network
+ * transmission from generated {@link 
org.apache.kafka.common.protocol.ApiMessage}
+ * types. Its main advantage over direct {@link ByteBuffer} allocation based on
+ * {@link 
org.apache.kafka.common.protocol.ApiMessage#size(ObjectSerializationCache, 
short)}
+ * is that it avoids copying "bytes" fields. The downside is that it is up to 
the caller
+ * to allocate a buffer which accounts only for the additional request 
overhead.
+ *
+ * See {@link org.apache.kafka.common.requests.EnvelopeRequest#toSend(String, 
RequestHeader)}
+ * for example usage.
+ */
+public class SendBuilder implements Writable {
+private final List buffers = new ArrayList<>();
+private final ByteBuffer buffer;
+
+public SendBuilder(ByteBuffer buffer) {

Review comment:
   The ```buffer``` is used/owned by ```SendBuilder`` only. It seems to me 
this constructor should accept ```capacity(int type)``` rather than 
```ByteBuffer```. (SendBuilder should create ```ByteBuffer``` in construction)

##
File path: 
clients/src/main/java/org/apache/kafka/common/network/SendBuilder.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Writable;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class provides a way to build {@link Send} objects for network
+ * transmission from generated {@link 
org.apache.kafka.common.protocol.ApiMessage}
+ * types. Its main advantage over direct {@link ByteBuffer} allocation based on
+ * {@link 
org.apache.kafka.common.protocol.ApiMessage#size(ObjectSerializationCache, 
short)}
+ * is that it avoids copying "bytes" fields. The downside is that it is up to 
the caller
+ * to allocate a buffer which accounts only for the additional request 
overhead.
+ *
+ * See {@link org.apache.kafka.common.requests.EnvelopeRequest#toSend(String, 
RequestHeader)}
+ * for example usage.
+ */
+public class SendBuilder implements Writable {
+private final List buffers = new ArrayList<>();
+private final ByteBuffer buffer;
+
+public SendBuilder(ByteBuffer buffer) {

Review comment:
   The ```buffer``` is used/owned by ```SendBuilder``` only. It seems to me 
this constructor should accept ```capacity(int type)``` rather than 
```ByteBuffer```. (SendBuilder should create ```ByteBuffer``` in construction)





This is an automated message from the Apache Git Service.
To respond to th

[GitHub] [kafka] chia7712 commented on pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-04 Thread GitBox


chia7712 commented on pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#issuecomment-722184706


   Does this improvement work for other requests?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >