[GitHub] [kafka] chia7712 commented on pull request #9365: KAFKA-10566: Fix erroneous config usage warnings
chia7712 commented on pull request #9365: URL: https://github.com/apache/kafka/pull/9365#issuecomment-721617530 Is it similar to https://issues.apache.org/jira/browse/KAFKA-10090? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9547: KAFKA-9630; Migrate OffsetsForLeaderEpochRequest/Response to the auto-generated protocol
chia7712 commented on a change in pull request #9547: URL: https://github.com/apache/kafka/pull/9547#discussion_r517212012 ## File path: clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json ## @@ -32,13 +32,13 @@ { "name": "Topics", "type": "[]OffsetForLeaderTopic", "versions": "0+", "about": "Each topic to get offsets for.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", Review comment: Previous "name" is ```topic``` (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java#L25) and the "name" in auto-generated protocol is ```name```. Does it break the compatibility? For example, a cluster mixed by auto-generated protocol and stale protocol. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8953: MINOR: re-enable EosBetaUpgradeIntegrationTest
chia7712 commented on pull request #8953: URL: https://github.com/apache/kafka/pull/8953#issuecomment-721626362 ```EosBetaUpgradeIntegrationTest``` gets unstable recently. Maybe we should re-disable it again :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9365: KAFKA-10566: Fix erroneous config usage warnings
tombentley commented on pull request #9365: URL: https://github.com/apache/kafka/pull/9365#issuecomment-721626482 @chia7712 I think there's overlap, but I think this one removes additional warnings because it also propagates usage tracking in `SslFactory`. (It also provides a non-public abstraction to as least make it easier to identify the places where we end up doing this usage tracking, which might be useful should we ever figure out a better way of doing this). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers
soarez commented on pull request #9000: URL: https://github.com/apache/kafka/pull/9000#issuecomment-721637971 > Took way to long to get this merged. My fault for screwing up several times. Thanks for not giving up on this and all the time dedicated to reviewing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs
bbejeck commented on a change in pull request #9554: URL: https://github.com/apache/kafka/pull/9554#discussion_r517339026 ## File path: docs/streams/developer-guide/testing.html ## @@ -73,67 +71,55 @@ // Processor API Topology topology = new Topology(); topology.addSource("sourceProcessor", "input-topic"); topology.addProcessor("processor", ..., "sourceProcessor"); -topology.addSink("sinkProcessor", "result-topic", "processor"); +topology.addSink("sinkProcessor", "output-topic", "processor"); // or // using DSL StreamsBuilder builder = new StreamsBuilder(); -builder.stream("input-topic").filter(...).to("result-topic"); +builder.stream("input-topic").filter(...).to("output-topic"); Review comment: good catch! Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs
bbejeck commented on a change in pull request #9554: URL: https://github.com/apache/kafka/pull/9554#discussion_r517339602 ## File path: docs/streams/upgrade-guide.html ## @@ -135,14 +134,12 @@ Streams API tasks to their new owners in the background. Check out https://cwiki.apache.org/confluence/x/0i4lBg";>KIP-441 for full details, including several new configs for control over this new feature. - New end-to-end latency metrics have been added. These task-level metrics will be logged at the INFO level and report the min and max end-to-end latency of a record at the beginning/source node(s) and end/terminal node(s) of a task. See https://cwiki.apache.org/confluence/x/gBkRCQ";>KIP-613 for more information. - -As of 2.6.0 Kafka Streams deprecates KStream.through() in favor of the new KStream.repartition() operator +As of 2.6.0 Kafka Streams deprecates KStream.through() if favor of the new KStream.repartition() operator Review comment: ack This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs
bbejeck merged pull request #9554: URL: https://github.com/apache/kafka/pull/9554 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs
bbejeck commented on pull request #9554: URL: https://github.com/apache/kafka/pull/9554#issuecomment-721732656 Merged #9554 into trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9554: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs
bbejeck commented on pull request #9554: URL: https://github.com/apache/kafka/pull/9554#issuecomment-721734486 cherry-picked to 2.7 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
dajac commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517317354 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { Review comment: It seems that `verbose` is not used anymore. I think that we were dumping the full Produce request and Fetch response when `verbose` was `true`. Should we just remove it? ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJso
[GitHub] [kafka] cadonna commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
cadonna commented on a change in pull request #9543: URL: https://github.com/apache/kafka/pull/9543#discussion_r517426911 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -145,7 +145,7 @@ private final String clientId; private final Metrics metrics; private final StreamsConfig config; -protected final StreamThread[] threads; +protected final ArrayList threads; Review comment: I would prefer to use `List` instead of `ArrayList` to be more generic. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[numStreamThreads]; - +threads = new ArrayList<>(numStreamThreads); Review comment: I think, it is better to keep the default initial capacity of an `ArrayList`. Otherwise, the first time a stream thread is added, we immediately run into a memory allocation. Since we do not know how many stream thread we might expect, let's use the default. We could also consider using a `LinkedList` since we never access by index in production code. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, stateDirectory, delegatingStateRestoreListener, i + 1); -threadState.put(threads[i].getId(), threads[i].state()); -storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); +threads.add(i, streamThread); +threadState.put(threads.get(i).getId(), threads.get(i).state()); +storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i))); } ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(Arrays.stream(threads).filter(thread -> thread.state().isAlive()).count())); +Math.toIntExact(Arrays.stream(threads.toArray(new StreamThread[numStreamThreads])).filter(thread -> thread.state().isAlive()).count())); Review comment: Please simplify to ```suggestion Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, stateDirectory, delegatingStateRestoreListener, i + 1); -threadState.put(threads[i].getId(), threads[i].state()); -storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); +threads.add(i, streamThread); +threadState.put(threads.get(i).getId(), threads.get(i).state()); +storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i))); Review comment: You can simplify to ```suggestion threads.add(streamThread); threadState.put(streamThread.getId(), streamThread.state()); storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage
vamossagar12 commented on a change in pull request #9539: URL: https://github.com/apache/kafka/pull/9539#discussion_r517462186 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, long currentTimeMs) { .map(follower -> new Voter().setVoterId(follower)) .collect(Collectors.toList()); +// Adding the leader to the voters as the protocol ensures that leader always votes for itself. +voters.add(new Voter().setVoterId(state.election().leaderId())); Review comment: Alright. Thanks @hachikuji , I will make the changes and update the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
cadonna commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r517464968 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java ## @@ -19,7 +19,7 @@ public final class StreamsAssignmentProtocolVersions { public static final int UNKNOWN = -1; public static final int EARLIEST_PROBEABLE_VERSION = 3; -public static final int LATEST_SUPPORTED_VERSION = 8; +public static final int LATEST_SUPPORTED_VERSION = 9; Review comment: Could you please also add the needed changes to system test `streams_upgrade_test.py::StreamsUpgradeTest.test_version_probing_upgrade` to this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage
vamossagar12 commented on a change in pull request #9539: URL: https://github.com/apache/kafka/pull/9539#discussion_r517462186 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, long currentTimeMs) { .map(follower -> new Voter().setVoterId(follower)) .collect(Collectors.toList()); +// Adding the leader to the voters as the protocol ensures that leader always votes for itself. +voters.add(new Voter().setVoterId(state.election().leaderId())); Review comment: Alright. Thanks @hachikuji . So, I need to add voters + voted for the current Leader in the LeaderChange message and see how to pass the latter to LeaderState. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10679) AK site docs changes need to get ported to Kafka/docs
[ https://issues.apache.org/jira/browse/KAFKA-10679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-10679: --- Assignee: Bill Bejeck > AK site docs changes need to get ported to Kafka/docs > - > > Key: KAFKA-10679 > URL: https://issues.apache.org/jira/browse/KAFKA-10679 > Project: Kafka > Issue Type: Bug > Components: docs >Affects Versions: 2.7.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > During the update of the Apache Kafka website, changes made to the kafka-site > repo were not made to the kafka/docs directory. > All the changes made need to get migrated to kafka/docs to keep the website > in sync. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r517481718 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +374,66 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException @NotNull if streamsUncaughtExceptionHandler is null. Review comment: I don't remember putting it there so it was probably a mistake This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r517485257 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,6 +1064,72 @@ private boolean close(final long timeoutMs) { } } +private void closeToError() { +if (!setState(State.ERROR)) { +// if transition failed, it means it was either in PENDING_SHUTDOWN +// or NOT_RUNNING already; just check that all threads have been stopped +log.info("Can not close to error from state " + state()); Review comment: That works This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10681) MM2 translateOffsets returns wrong offsets
Carlo Bongiovanni created KAFKA-10681: - Summary: MM2 translateOffsets returns wrong offsets Key: KAFKA-10681 URL: https://issues.apache.org/jira/browse/KAFKA-10681 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.5.0 Environment: GKE, strimzi release Reporter: Carlo Bongiovanni Hi all, we'd like to make use of the ability of MM2 to mirror checkpoints of consumer offsets, in order to have a graceful failover from an active cluster to a standby one. For this reason we have created the following setup (FYI all done with strimzi on k8s): * an active kafka cluster 2.5.0 used by a few producers/consumers * a standby kafka cluster 2.5.0 * MM2 is setup in one direction only to mirror from active to standby We have let MM2 run for some time and we could verify that messages are effectively mirrored. At this point we have started developing the tooling to create consumer groups in the consumer-offsets topic of the passive cluster, by reading the internal checkpoints topic. The following is an extract of our code to read the translated offsets: {code:java} Map mm2Props = new HashMap<>(); mm2Props.put(BOOTSTRAP_SERVERS_CONFIG, "bootstrap_servers"); mm2Props.put("source.cluster.alias", "euwe"); mm2Props.put(SASL_MECHANISM, "SCRAM-SHA-512"); mm2Props.put(SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";"); mm2Props.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); mm2Props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/usr/local/lib/jdk/lib/security/cacerts"); mm2Props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "some-password"); Map translatedOffsets = RemoteClusterUtils .translateOffsets(mm2Props, (String) mm2Props.get("source.cluster.alias"), cgi, Duration.ofSeconds(60L)); {code} Before persisting the translated offsets with {code:java} AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = kafkaClient .alterConsumerGroupOffsets(cgi, offsets);{code} we filter them because we don't want to create consumer groups for all retrieved offsets. During the filtering, we compare the values of the translated offset for each topic partition (as coming from the checkpoint topic), with the respective current offset value for each topic partition (as mirrored from MM2). While running this check we have verified that for some topics we get big difference between those values, while for other topics the update seems realistic. For example, looking at a given target partition we see it has an offset of 100 (after mirroring by mm2). From the checkpoint topic for the same consumer group id, we receive offset 200, and later 150. The issues are that: * both consumer group id offsets exceed the real offset of the partition * the consumer group id offsets from checkpoint goes down over time, not up We haven't been able to explain it, the wrong numbers are coming from the *RemoteClusterUtils.translateOffsets()* and we're wondering if this could be a misconfiguration on our side or a bug of MM2. Thanks, best C. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
wcarlson5 commented on a change in pull request #9543: URL: https://github.com/apache/kafka/pull/9543#discussion_r517516580 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[numStreamThreads]; - +threads = new ArrayList<>(numStreamThreads); Review comment: That is fair This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
wcarlson5 commented on pull request #9543: URL: https://github.com/apache/kafka/pull/9543#issuecomment-721881768 @cadonna Looks like all those changes made sense This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets
[ https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-7556: - Assignee: Justine Olshan > KafkaConsumer.beginningOffsets does not return actual first offsets > --- > > Key: KAFKA-7556 > URL: https://issues.apache.org/jira/browse/KAFKA-7556 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 1.0.0 >Reporter: Robert V >Assignee: Justine Olshan >Priority: Critical > Labels: documentation, usability > > h2. Description of the problem > The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` > claims in its Javadoc documentation that it would 'Get the first offset for > the given partitions.'. > I used it with a compacted topic, and it always returned offset 0 for all > partitions. > Not sure if using a compacted topic actually matters, but I'm enclosing this > information anyway. > Given a Kafka topic with retention set, and old log files being removed as a > result of that, the effective start offset of those partitions move further; > it simply will be greater than offset 0. > However, calling the `beginningOffsets` method always returns offset 0 as the > first offset. > In contrast, when the method > `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called > with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the > effective start offsets for each partitions. > Output of using > `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: > {code:java} > {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, > test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, > test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, > test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, > test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, > test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, > test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, > test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, > test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, > test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, > test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, > test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, > test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, > test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, > test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, > test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, > test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, > test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, > test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, > test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, > test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, > test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, > test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, > test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, > test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0} > {code} > Output of using > `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`: > {code:java} > {test.topic-87=(timestamp=1511264434285, offset=289), > test.topic-54=(timestamp=1511265134993, offset=45420), > test.topic-21=(timestamp=1511265534207, offset=63643), > test.topic-79=(timestamp=1511270338275, offset=380750), > test.topic-46=(timestamp=1511266883588, offset=266379), > test.topic-13=(timestamp=1511265900538, offset=98512), > test.topic-70=(timestamp=1511266972452, offset=118522), > test.topic-37=(timestamp=1511264396370, offset=763), > test.topic-12=(timestamp=1511265504886, offset=61108), > test.topic-95=(timestamp=1511289492800, offset=847647), > test.topic-62=(timestamp=1511265831298, offset=68299), > test.topic-29=(timestamp=1511278767417, offset=548361), > test.topic-4=(timestamp=1511269316679, offset=144855), > test.topic-88=(timestamp=1511265608468, offset=107831), > test.topic-55=(timestamp=1511267449288, offset=129241), > test.topic-22=(timestamp=1511283134114, offset=563095), > test.topic-80=(timestamp=1511277334877, offset=534859), > test.topic-47=(timestamp=1511265530689, offset=71608), > test.topic-14=(timestamp=1511266308829, offset=80962), > test.topic-71=(timestamp=1511265474740, offset=83607), > test.topic-38=(timestamp=1511268268259, offset=166460), > test.topic-5=(timestamp=1511276243850, offset=294307), > test.topic-96=(timesta
[GitHub] [kafka] vvcephei commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
vvcephei commented on a change in pull request #9543: URL: https://github.com/apache/kafka/pull/9543#discussion_r517539432 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[numStreamThreads]; - +threads = new LinkedList<>(); Review comment: Should this collection be threadsafe? (or are all accesses inside synchronized blocks anyway?) ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, stateDirectory, delegatingStateRestoreListener, i + 1); -threadState.put(threads[i].getId(), threads[i].state()); -storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); +threads.add(i, streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i))); Review comment: ```suggestion storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); ``` `get(i)` is also O(n) for a linked list. Again, this is admittedly a nit, since the list is small. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, stateDirectory, delegatingStateRestoreListener, i + 1); -threadState.put(threads[i].getId(), threads[i].state()); -storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); +threads.add(i, streamThread); Review comment: A bit of a nitpick, but this operation is O(n) for LinkedList. Better to just `add(streamThread)` if you want to use LinkedList. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r517543638 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java ## @@ -19,7 +19,7 @@ public final class StreamsAssignmentProtocolVersions { public static final int UNKNOWN = -1; public static final int EARLIEST_PROBEABLE_VERSION = 3; -public static final int LATEST_SUPPORTED_VERSION = 8; +public static final int LATEST_SUPPORTED_VERSION = 9; Review comment: Can you also leave a comment here reminding us to fix the version probing system test whenever this protocol number is bumped? Since we apparently always forget This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8953: MINOR: re-enable EosBetaUpgradeIntegrationTest
ableegoldman commented on pull request #8953: URL: https://github.com/apache/kafka/pull/8953#issuecomment-721896700 I believe @mjsax is planning to look into it after his current work, but it seems reasonable to disable it again until then This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niteshmor opened a new pull request #9556: MINOR: Update jetty to 9.4.33
niteshmor opened a new pull request #9556: URL: https://github.com/apache/kafka/pull/9556 Jetty 9.4.32 and before are affected by CVE-2020-27216. This vulnerability is fixed in Jetty 9.4.33, please see the jetty project security advisory for details: https://github.com/eclipse/jetty.project/security/advisories/GHSA-g3wg-6mcf-8jj6#advisory-comment-63053 Unit tests and integration tests pass locally after the upgrade. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niteshmor commented on pull request #9556: MINOR: Update jetty to 9.4.33
niteshmor commented on pull request #9556: URL: https://github.com/apache/kafka/pull/9556#issuecomment-721898016 For applying this fix to older branches, this commit can be cherry-picked to `2.7` branch. For branches `2.6` and earlier, `jersey` version also needs to be upgraded to `2.31` based on the following note in `2.5` and `2.6` branches that are on jetty `9.4.24`: https://github.com/apache/kafka/blob/2.6/gradle/dependencies.gradle#L71 For validating that this change can be backported, I have prepared commits in my own fork for older branches going back till 2.4 and ran the tests locally. To assist in applying the fix to older branches, these can serve as a reference for what the exact change is. Or I can create new pull requests to the older branches if that's easier. - For 2.7, unit/integration tests pass locally. Reference commit: https://github.com/niteshmor/kafka/commit/8b51bd18f4541d1edb7d423a01314f78f1988fe8 - For 2.6, 2 tests failed locally, but I believe they are unrelated. The failed tests are: - `SslAdminIntegrationTest.testCreateTopicsResponseMetadataAndConfig` - `ReplicaManagerTest.testFencedErrorCausedByBecomeLeader` Reference commit: https://github.com/niteshmor/kafka/commit/a0d3dc0f2e490d04b324ad72c47cd1363cc9dd6c - For 2.5, unit/integration tests pass locally. Reference commit: https://github.com/niteshmor/kafka/commit/f8f68c334132dd293d96d5be1a1c3d307896cefa - For 2.4, unit/integration tests pass locally. Reference commit: https://github.com/niteshmor/kafka/commit/9c08944c3c793c7733e0971a68dcb580dd4c288f This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niteshmor commented on pull request #9556: MINOR: Update jetty to 9.4.33
niteshmor commented on pull request #9556: URL: https://github.com/apache/kafka/pull/9556#issuecomment-721900447 cc @ijuma @kkonstantine @rhauch @C0urante This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks
navin created KAFKA-10682: - Summary: Windows Kafka cluster not reachable via Azure data Bricks Key: KAFKA-10682 URL: https://issues.apache.org/jira/browse/KAFKA-10682 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.6.0 Reporter: navin We have windows Kafka cluster, * We enabled inbound and outbound for port 9092/9093 * Topic return results on local windows cmd used ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning --bootstrap-server 10.53.56.140:9092 * We trying to consume the topic from Azure data bricks ** df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "10.53.56.140:9092") \ .option("subscribe", "SIP.SIP.SHIPMENT") \ .option("minPartitions", "10") \ .option("startingOffsets", "earliest") \ .load() #df.isStreaming() # Returns True for DataFrames that have streaming sources df.printSchema() ** Display(df) On using display command after before amount of time we got below error: Lost connection to cluster. The notebook may have been detached or the cluster may have been terminated due to an error in the driver such as an OutOfMemoryError. What we see in Logs is below error 20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)java.net.UnknownHostException: Navin.us.corp.tim.com at java.net.InetAddress.getAllByName0(InetAddress.java:1281) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) at kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) at kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) at kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) at kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569) at org.apache.spark.sql.kafka010.KafkaOffsetReade
[jira] [Updated] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks
[ https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] navin updated KAFKA-10682: -- Description: We have windows Kafka cluster, * We enabled inbound and outbound for port 9092/9093 * Topic return results on local windows cmd used ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning --bootstrap-server 10.53.56.140:9092 * We trying to consume the topic from Azure data bricks ** Simple ping and telnet works fine and connects to underlying server ** df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "10.53.56.140:9092") \ .option("subscribe", "SIP.SIP.SHIPMENT") \ .option("minPartitions", "10") \ .option("startingOffsets", "earliest") \ .load() #df.isStreaming() # Returns True for DataFrames that have streaming sources df.printSchema() * ** Display(df) On using display command after before amount of time we got below error: Lost connection to cluster. The notebook may have been detached or the cluster may have been terminated due to an error in the driver such as an OutOfMemoryError. What we see in Logs is below error 20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)java.net.UnknownHostException: Navin.us.corp.tim.com at java.net.InetAddress.getAllByName0(InetAddress.java:1281) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) at kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) at kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) at kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) at kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569) at org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetRe
[jira] [Updated] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks
[ https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] navin updated KAFKA-10682: -- Description: We have windows Kafka cluster, * We enabled inbound and outbound for port 9092/9093 * Topic return results on local windows cmd used ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning --bootstrap-server 10.53.56.140:9092 * We trying to consume the topic from Azure data bricks ** Simple ping and telnet works fine and connects to underlying server *** %sh telnet 10.53.56.140 9092 *** %sh ping 10.53.56.140 ** df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "10.53.56.140:9092") \ .option("subscribe", "SIP.SIP.SHIPMENT") \ .option("minPartitions", "10") \ .option("startingOffsets", "earliest") \ .load() #df.isStreaming() # Returns True for DataFrames that have streaming sources df.printSchema() * ** Display(df) On using display command after before amount of time we got below error: Lost connection to cluster. The notebook may have been detached or the cluster may have been terminated due to an error in the driver such as an OutOfMemoryError. What we see in Logs is below error 20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)java.net.UnknownHostException: Navin.us.corp.tim.com at java.net.InetAddress.getAllByName0(InetAddress.java:1281) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) at kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) at kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) at kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) at kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569) at org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReade
[GitHub] [kafka] cmccabe opened a new pull request #9557: Kip 500 move legacy
cmccabe opened a new pull request #9557: URL: https://github.com/apache/kafka/pull/9557 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #9557: Kip 500 move legacy
cmccabe closed pull request #9557: URL: https://github.com/apache/kafka/pull/9557 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mikebin commented on pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner
mikebin commented on pull request #8690: URL: https://github.com/apache/kafka/pull/8690#issuecomment-721907104 Looks like this is still pending completion? Just wanted to check on status of getting this merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks
[ https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226342#comment-17226342 ] navin commented on KAFKA-10682: --- uncommented below command in server-properties located at kafka\config; Issue got resolved listeners = PLAINTEXT://10.53.56.140:9092 > Windows Kafka cluster not reachable via Azure data Bricks > - > > Key: KAFKA-10682 > URL: https://issues.apache.org/jira/browse/KAFKA-10682 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.6.0 >Reporter: navin >Priority: Minor > > We have windows Kafka cluster, > * We enabled inbound and outbound for port 9092/9093 > * Topic return results on local windows cmd used > ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning > --bootstrap-server 10.53.56.140:9092 > * We trying to consume the topic from Azure data bricks > ** Simple ping and telnet works fine and connects to underlying server > *** %sh telnet 10.53.56.140 9092 > *** %sh ping 10.53.56.140 > ** df = spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", "10.53.56.140:9092") \ > .option("subscribe", "SIP.SIP.SHIPMENT") \ > .option("minPartitions", "10") \ > .option("startingOffsets", "earliest") \ > .load() > #df.isStreaming() # Returns True for DataFrames that have streaming sources > df.printSchema() > * > ** Display(df) > On using display command after before amount of time we got below error: > Lost connection to cluster. The notebook may have been detached or the > cluster may have been terminated due to an error in the driver such as an > OutOfMemoryError. > What we see in Logs is below error > 20/11/04 18:23:52 WARN NetworkClient: [Consumer > clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, > > groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] > Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: > null)20/11/04 18:23:52 WARN NetworkClient: [Consumer > clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, > > groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] > Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: > null)java.net.UnknownHostException: Navin.us.corp.tim.com at > java.net.InetAddress.getAllByName0(InetAddress.java:1281) at > java.net.InetAddress.getAllByName(InetAddress.java:1193) at > java.net.InetAddress.getAllByName(InetAddress.java:1127) at > kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) > at > kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) > at > kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scal
[jira] [Resolved] (KAFKA-10682) Windows Kafka cluster not reachable via Azure data Bricks
[ https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] navin resolved KAFKA-10682. --- Resolution: Fixed kafka\config Add listeners = PLAINTEXT://10.53.56.140:9092 > Windows Kafka cluster not reachable via Azure data Bricks > - > > Key: KAFKA-10682 > URL: https://issues.apache.org/jira/browse/KAFKA-10682 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.6.0 >Reporter: navin >Priority: Minor > > We have windows Kafka cluster, > * We enabled inbound and outbound for port 9092/9093 > * Topic return results on local windows cmd used > ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning > --bootstrap-server 10.53.56.140:9092 > * We trying to consume the topic from Azure data bricks > ** Simple ping and telnet works fine and connects to underlying server > *** %sh telnet 10.53.56.140 9092 > *** %sh ping 10.53.56.140 > ** df = spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", "10.53.56.140:9092") \ > .option("subscribe", "SIP.SIP.SHIPMENT") \ > .option("minPartitions", "10") \ > .option("startingOffsets", "earliest") \ > .load() > #df.isStreaming() # Returns True for DataFrames that have streaming sources > df.printSchema() > * > ** Display(df) > On using display command after before amount of time we got below error: > Lost connection to cluster. The notebook may have been detached or the > cluster may have been terminated due to an error in the driver such as an > OutOfMemoryError. > What we see in Logs is below error > 20/11/04 18:23:52 WARN NetworkClient: [Consumer > clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, > > groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] > Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: > null)20/11/04 18:23:52 WARN NetworkClient: [Consumer > clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, > > groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] > Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: > null)java.net.UnknownHostException: Navin.us.corp.tim.com at > java.net.InetAddress.getAllByName0(InetAddress.java:1281) at > java.net.InetAddress.getAllByName(InetAddress.java:1193) at > java.net.InetAddress.getAllByName(InetAddress.java:1127) at > kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) > at > kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) > at > kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(Kafk
[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
lbradstreet commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517562348 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: @dajac if I understand correctly, the current generated JSON does not print the recordSet either. Is it breaking anything to return the size rather than an empty array? ``` } if (_object.recordSet == null) { _node.set("recordSet", NullNode.instance); } else { _node.set("recordSet", new BinaryNode(new byte[]{})); } ``` That said, a verbose mode where we print out a byte array seems reasonable if we don't use it in the trace logging path by default? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
lbradstreet commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517562348 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: @dajac if I understand correctly, the current generated JSON does not print the recordSet either. Is it breaking anything to return the size rather than an empty array? ``` } if (_object.recordSet == null) { _node.set("recordSet", NullNode.instance); } else { _node.set("recordSet", new BinaryNode(new byte[]{})); } ``` That said, implementing a verbose mode (3) where we print out a byte array seems reasonable if we don't use it in the trace logging path by default? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r517576758 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -311,6 +317,22 @@ public void run() { "Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException ); +} catch (final Exception e) { +if (this.streamsUncaughtExceptionHandler == null) { +throw e; +} +if (Thread.getDefaultUncaughtExceptionHandler() != null && newHandler) { +log.error("Stream's new uncaught exception handler is set as well as the deprecated old handler." + +"The old handler will be ignored as long as a new handler is set."); Review comment: I think it is simpler to check in the Stream thread because we don't in KafkaStreams if the handlers have been set so we would have to check the stream thread a global thread so it would be much easier to just check in the thread. I do agree that it should be bumped down to warn through. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9544: MINOR: Add back 2.6 notable update section taken out by mistake
bbejeck merged pull request #9544: URL: https://github.com/apache/kafka/pull/9544 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9544: MINOR: Add back 2.6 notable update section taken out by mistake
bbejeck commented on pull request #9544: URL: https://github.com/apache/kafka/pull/9544#issuecomment-721937375 Merged #9544 into trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #7751: KAFKA-7987: Reinitialize ZookeeperClient after auth failures
rajinisivaram commented on pull request #7751: URL: https://github.com/apache/kafka/pull/7751#issuecomment-721940645 Streams test failure not related, merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #7751: KAFKA-7987: Reinitialize ZookeeperClient after auth failures
rajinisivaram merged pull request #7751: URL: https://github.com/apache/kafka/pull/7751 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram closed pull request #9550: [DO NOT MERGE] Temporary PR to track test failure in Jenkins build
rajinisivaram closed pull request #9550: URL: https://github.com/apache/kafka/pull/9550 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10683) Consumer.position() Ignores Transaction Marker with read_uncommitted
Gary Russell created KAFKA-10683: Summary: Consumer.position() Ignores Transaction Marker with read_uncommitted Key: KAFKA-10683 URL: https://issues.apache.org/jira/browse/KAFKA-10683 Project: Kafka Issue Type: Bug Components: clients, core Affects Versions: 2.6.0 Reporter: Gary Russell The workaround for https://issues.apache.org/jira/browse/KAFKA-6607# Says: {quote} or use `consumer.position()` that takes the commit marker into account and would "step over it") {quote} Note that this problem occurs with all consumers, not just Streams. We have implemented this solution in our project (as an option for those users concerned about the pseudo lag). We have discovered that this technique will only work with {code}isolation.level=read_committed{code} Otherwise, the {code}position(){code} call does not include the marker "record". https://github.com/spring-projects/spring-kafka/issues/1587#issuecomment-721899560 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-7987) a broker's ZK session may die on transient auth failure
[ https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7987. --- Fix Version/s: 2.8.0 Reviewer: Jun Rao Resolution: Fixed > a broker's ZK session may die on transient auth failure > --- > > Key: KAFKA-7987 > URL: https://issues.apache.org/jira/browse/KAFKA-7987 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Priority: Critical > Fix For: 2.8.0 > > > After a transient network issue, we saw the following log in a broker. > {code:java} > [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: > javax.security.sasl.SaslException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Server not found in Kerberos database (7))]) occurred when > evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client > will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn) > [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > {code} > The network issue prevented the broker from communicating to ZK. The broker's > ZK session then expired, but the broker didn't know that yet since it > couldn't establish a connection to ZK. When the network was back, the broker > tried to establish a connection to ZK, but failed due to auth failure (likely > due to a transient KDC issue). The current logic just ignores the auth > failure without trying to create a new ZK session. Then the broker will be > permanently in a state that it's alive, but not registered in ZK. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r517621757 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -294,7 +304,10 @@ public static StreamThread create(final InternalTopologyBuilder builder, final long cacheSizeBytes, final StateDirectory stateDirectory, final StateRestoreListener userStateRestoreListener, - final int threadIdx) { + final int threadIdx, + final ShutdownErrorHook shutdownErrorHook, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, + final AtomicInteger assignmentErrorCode) { Review comment: You are right it seems that it is not necessary This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r517627843 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java ## @@ -19,7 +19,7 @@ public final class StreamsAssignmentProtocolVersions { public static final int UNKNOWN = -1; public static final int EARLIEST_PROBEABLE_VERSION = 3; -public static final int LATEST_SUPPORTED_VERSION = 8; +public static final int LATEST_SUPPORTED_VERSION = 9; Review comment: thanks for the reminder. I think I I under stood the test ad incrementing to the next version, as the version is now 9 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
wcarlson5 commented on a change in pull request #9543: URL: https://github.com/apache/kafka/pull/9543#discussion_r517629961 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[numStreamThreads]; - +threads = new LinkedList<>(); Review comment: They are not all in synchronized blocks so I think it probably should be thread safe This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
wcarlson5 commented on a change in pull request #9543: URL: https://github.com/apache/kafka/pull/9543#discussion_r517630194 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, stateDirectory, delegatingStateRestoreListener, i + 1); -threadState.put(threads[i].getId(), threads[i].state()); -storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); +threads.add(i, streamThread); Review comment: yep, should have changed that when I moved from ArrayList This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable
wcarlson5 commented on a change in pull request #9543: URL: https://github.com/apache/kafka/pull/9543#discussion_r517630320 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, stateDirectory, delegatingStateRestoreListener, i + 1); -threadState.put(threads[i].getId(), threads[i].state()); -storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); +threads.add(i, streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i))); Review comment: Good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
junrao commented on pull request #8826: URL: https://github.com/apache/kafka/pull/8826#issuecomment-721987780 @chia7712 : Thanks for the PR. I ran the following command with the PR. bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config kafka.properties --create --topic test1 kafka.properties ``` security.protocol=SSL ssl.protocol=TLS ``` I still saw the WARN. `[2020-11-04 13:32:16,059] WARN The configuration 'ssl.protocol' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig) ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517645718 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { Review comment: Yes, I kept it because `requestDescMetrics` originally had the `detailsEnabled`, but I'll remove it since none of the JsonConverters differentiates between verbose and non-verbose. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517649219 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version) + case res: DescribeUserScramCredentialsRequest => DescribeUserScramCredentialsReq
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517651742 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { Review comment: Just read through the comment below, so I'll keep it for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niteshmor edited a comment on pull request #9556: MINOR: Update jetty to 9.4.33
niteshmor edited a comment on pull request #9556: URL: https://github.com/apache/kafka/pull/9556#issuecomment-721898016 For applying this fix to older branches, this commit can be cherry-picked to `2.7` branch. For branches `2.6` and earlier, `jersey` version also needs to be upgraded to `2.31` based on the following note in `2.5` and `2.6` branches that are on jetty `9.4.24`: https://github.com/apache/kafka/blob/2.6/gradle/dependencies.gradle#L71 For validating that this change can be backported, I have prepared commits in my own fork for older branches going back till 2.4 and ran the tests locally. Note that I only used Java 11 in my local testing. To assist in applying the fix to older branches, these can serve as a reference for what the exact change is. Or I can create new pull requests to the older branches if that's easier. - For 2.7, unit/integration tests pass locally. Reference commit: https://github.com/niteshmor/kafka/commit/8b51bd18f4541d1edb7d423a01314f78f1988fe8 - For 2.6, 2 tests failed locally, but I believe they are unrelated. The failed tests are: - `SslAdminIntegrationTest.testCreateTopicsResponseMetadataAndConfig` - `ReplicaManagerTest.testFencedErrorCausedByBecomeLeader` Reference commit: https://github.com/niteshmor/kafka/commit/a0d3dc0f2e490d04b324ad72c47cd1363cc9dd6c - For 2.5, unit/integration tests pass locally. Reference commit: https://github.com/niteshmor/kafka/commit/f8f68c334132dd293d96d5be1a1c3d307896cefa - For 2.4, unit/integration tests pass locally. Reference commit: https://github.com/niteshmor/kafka/commit/9c08944c3c793c7733e0971a68dcb580dd4c288f This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niteshmor commented on pull request #9556: MINOR: Update jetty to 9.4.33
niteshmor commented on pull request #9556: URL: https://github.com/apache/kafka/pull/9556#issuecomment-721998495 Test failure seems to be unrelated. `java.nio.file.DirectoryNotEmptyException` in `org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on pull request #9103: URL: https://github.com/apache/kafka/pull/9103#issuecomment-722005317 Mvn failure is not related, merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: @lbradstreet The current generated JSON does not print the recordSet. When we were serializing a BinaryNode with an empty array, it was still being deserialized as a BinaryNode, so it doesn't break anything. ``` buffer.printf("%s;%n", target.assignmentStatement( String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\")))", target.sourceVariable(), target.humanReadableName(; ``` So I believe the concern is that I had proposed to change the serialization to an IntNode, but the deserialization still expects a BinaryNode which I should have also changed. Another alternative I could fix this by changing the deserialization to expect an IntNode, but I'm for adding a verbose flag This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda merged pull request #9103: URL: https://github.com/apache/kafka/pull/9103 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: @lbradstreet The current generated JSON does not print the recordSet either. But when we are serializing a BinaryNode with an empty array, it is still being deserialized as a BinaryNode, so it doesn't break anything. ``` buffer.printf("%s;%n", target.assignmentStatement( String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\")))", target.sourceVariable(), target.humanReadableName(; ``` So I believe the concern is that I had proposed to change the serialization to an IntNode, but the deserialization still expects a BinaryNode which I should have also changed. Another alternative I could fix this by changing the deserialization to expect an IntNode, but I'm for adding a verbose flag This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: @lbradstreet The current generated JSON does not print the recordSet either. But when we are serializing a BinaryNode with an empty array, it is still being deserialized as a BinaryNode, so it doesn't break anything. ``` buffer.printf("%s;%n", target.assignmentStatement( String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\")))", target.sourceVariable(), target.humanReadableName(; ``` So I believe the concern is that I had proposed to change the serialization to an IntNode, but the deserialization still expects a BinaryNode which I should have also changed. Another alternative I could fix this by changing the deserialization to expect an IntNode, but I'm for adding a verbose flag This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517651742 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { Review comment: Just read through the comment below, so I'll keep the `verbose` flag for now, but I'll remove it depending on what we decide below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517672247 ## File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.snapshot + +import java.nio.ByteBuffer +import java.nio.file.Path +import java.util.{Iterator => JIterator} +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.record.FileRecords +import org.apache.kafka.raft.OffsetAndEpoch +import org.apache.kafka.snapshot.SnapshotReader + +final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: OffsetAndEpoch) extends SnapshotReader { Review comment: Sounds good. I'll move this code over and implement it in Java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517699783 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: Nvm, we can't deserialize `recordSet` to an IntNode because `FetchResponseData` expects `recordSet` to be of type `BaseRecords`, not `int`. What I've done was left deserialization the same and added the verbose tag to serialization. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +// TODO: Write documentation for this type and all of the methods +public interface SnapshotReader extends Closeable, Iterable { Review comment: This comment applies to some of your other observations. At high-level there are 4 use cases that we need to design and implement. Two use cases are for the Raft implementation. Two use cases are for the state machine. ### Raft implementation These types are internal to the raft implementation and don't have to be exposed to the state machine. Leader Use Case The leader needs to be able to send a part of the snapshot over the network. Something like this ```java interface SnapshotReader extends Closeable { long transferTo(long position, long maxBytes, WritableChannel channel); int read(ByteBuffer buffer, long position); } ``` Follower Use Case The followers need to be able to copy bytes from the network and validate the snapshot on disk when fetching is done. ```java interface SnapshotWriter extends Closeable { void append(ByteBuffer buffer); void validate(); void freeze(); } ``` ### State machine implementation These types are exposed to the state machine. Load Snapshot The state machine needs to be able to load/scan the entire snapshot. The state machine can use `close` to tell the raft client that it finished loading the snapshot. This will be implemented in a future PR but it could look like this: ```java interface BatchedSnapshotReader extends Iterable>, Closeable { } ``` Generate Snapshot The state machine needs to be able to generate a snapshot by appending records/values and marking the snapshot as immutable (`freeze`) when it is done. ```java interface BatchdSnapshotWriter extends Closeable { void append(Iterable records); void freeze(); } ``` `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will have a real implementation and a mocked implementation for testing. These two types are internal to raft and are not exposed to the state machine. `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on `SnapshotWriter` and `SnapshotReade` to reuse some code but this is not strictly required. These two type don't have to be `interfaces` if they delegate the IO to `SnapshotWriter` and `SnapshotReader`. What do you think @hachikuji? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517703625 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +// TODO: Write documentation for this type and all of the methods +public interface SnapshotReader extends Closeable, Iterable { + +public OffsetAndEpoch snapshotId(); + +public long sizeInBytes(); + +public Iterator iterator(); + +public int read(ByteBuffer buffer, long position) throws IOException; Review comment: I think I address this in this comment https://github.com/apache/kafka/pull/9512#discussion_r517703078. If you agree let's move the conversation there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517703953 ## File path: raft/src/main/java/org/apache/kafka/snapshot/BatchedSnapshotWriter.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.raft.internals.BatchAccumulator; + +// TODO: Write documentation for this type and all of the methods +final public class BatchedSnapshotWriter implements Closeable { Review comment: I cover some of the motivation for this here: https://github.com/apache/kafka/pull/9512#discussion_r517703078. Let's move the conversation there if you agree. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +// TODO: Write documentation for this type and all of the methods +public interface SnapshotReader extends Closeable, Iterable { Review comment: This comment applies to some of your other observations. At high-level there are 4 use cases that we need to design and implement. Two use cases are for the Raft implementation. Two use cases are for the state machine. ### Raft implementation These types are internal to the raft implementation and don't have to be exposed to the state machine. Leader Use Case The leader needs to be able to send a part of the snapshot over the network. Something like this ```java interface SnapshotReader extends Closeable { long transferTo(long position, long maxBytes, WritableChannel channel); int read(ByteBuffer buffer, long position); } ``` Follower Use Case The followers need to be able to copy bytes from the network and validate the snapshot on disk when fetching is done. ```java interface SnapshotWriter extends Closeable { void append(ByteBuffer buffer); void validate(); void freeze(); } ``` ### State machine implementation These types are exposed to the state machine. Load Snapshot The state machine needs to be able to load/scan the entire snapshot. The state machine can use `close` to tell the raft client that it finished loading the snapshot. This will be implemented in a future PR but it could look like this: ```java interface BatchedSnapshotReader extends Iterable>, Closeable { } ``` Generate Snapshot The state machine needs to be able to generate a snapshot by appending records/values and marking the snapshot as immutable (`freeze`) when it is done. ```java interface BatchdSnapshotWriter extends Closeable { void append(Iterable records); void freeze(); } ``` ### Notes `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will have a real implementation and a mocked implementation for testing. These two types are internal to raft and are not exposed to the state machine. `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on `SnapshotWriter` and `SnapshotReade` to reuse some code but this is not strictly required. These two type don't have to be `interfaces` if they delegate the IO to `SnapshotWriter` and `SnapshotReader`. What do you think @hachikuji? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9558: KAFKA-10342: migrate remaining RPCs to forwarding
abbccdda opened a new pull request #9558: URL: https://github.com/apache/kafka/pull/9558 This PR will follow up https://github.com/apache/kafka/commit/0814e4f645880a3c63102fc197c8912c63846ad5 to migrate the remaining RPCs which need forwarding: - CreateAcls - DeleteAcls - CreateDelegationToken - RenewDelegationToken - ExpireDelegationToken - AlterPartitionReassignment - CreatePartition - DeleteTopics - UpdateFeatures - Scram We also refactored the `KafkaApisTest` to make it easier to add new unit tests for the forwarding path of new RPCs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9559: HOTFIX: RequestContext constructor change
abbccdda opened a new pull request #9559: URL: https://github.com/apache/kafka/pull/9559 Hit an unfortunate merge conflict at the same time with https://github.com/apache/kafka/commit/5df8457e05f6808145e90f5637d7f8a4aed548d9 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10181) Create Envelope RPC and redirection template for configuration change RPCs
[ https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10181. - Resolution: Fixed > Create Envelope RPC and redirection template for configuration change RPCs > -- > > Key: KAFKA-10181 > URL: https://issues.apache.org/jira/browse/KAFKA-10181 > Project: Kafka > Issue Type: Sub-task > Components: admin >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.8.0 > > > In the bridge release broker, > AlterConfig/IncrementalAlterConfig/CreateTopics/AlterClientQuota should be > redirected to the active controller. This ticket will ensure those RPCs get > redirected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda opened a new pull request #9560: KAFKA-10345: Add ZK-notification based update for trust/key store paths
abbccdda opened a new pull request #9560: URL: https://github.com/apache/kafka/pull/9560 SSL trust store and key store paths update could no longer go through the direct per broker update due to forwarding. We need to add a mechanism to trigger the update through ZK notification. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10624) [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration
[ https://issues.apache.org/jira/browse/KAFKA-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam reassigned KAFKA-10624: Assignee: Kowshik Prakasam > [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration > > > Key: KAFKA-10624 > URL: https://issues.apache.org/jira/browse/KAFKA-10624 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Minor > > In Scala, we prefer sealed traits over Enumeration since the former gives you > exhaustiveness checking. With Scala Enumeration, you don't get a warning if > you add a new value that is not handled in a given pattern match. > This Jira tracks refactoring enum > [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801] > from an enum to a sealed trait. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik opened a new pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration
kowshik opened a new pull request #9561: URL: https://github.com/apache/kafka/pull/9561 In this PR, I've switched the `FeatureZNodeStatus` enum to be a sealed trait. In Scala, we prefer sealed traits over Enumeration since the former gives you exhaustiveness checking. With Scala enumeration, you don't get a warning if you add a new value that is not handled in a given pattern match. **Test plan:** Rely on existing tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration
kowshik commented on pull request #9561: URL: https://github.com/apache/kafka/pull/9561#issuecomment-722097335 @junrao @abbccdda: this PR is ready for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik opened a new pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow
kowshik opened a new pull request #9562: URL: https://github.com/apache/kafka/pull/9562 Fixed the param doc in `FinalizedFeatureChangeListener.initOrThrow` method. The parameter `waitOnceForCacheUpdateMs` is expected to be > 0, but the doc was incorrect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration
kowshik commented on pull request #9561: URL: https://github.com/apache/kafka/pull/9561#issuecomment-722113042 The build in CI is broken due to https://github.com/apache/kafka/pull/9559 . I'll rebase on top of latest AK trunk, after that PR is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow
kowshik commented on pull request #9562: URL: https://github.com/apache/kafka/pull/9562#issuecomment-722119275 The build in CI is broken due to #9559 . I'll rebase on top of latest AK trunk, after that PR is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik edited a comment on pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow
kowshik edited a comment on pull request #9562: URL: https://github.com/apache/kafka/pull/9562#issuecomment-722119275 The build in CI is broken due to absence of #9559. I'll rebase on top of latest AK trunk, after that PR is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik edited a comment on pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration
kowshik edited a comment on pull request #9561: URL: https://github.com/apache/kafka/pull/9561#issuecomment-722113042 The build in CI is broken due to absence of https://github.com/apache/kafka/pull/9559. I'll rebase on top of latest AK trunk, after that PR is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9559: HOTFIX: RequestContext constructor change
chia7712 merged pull request #9559: URL: https://github.com/apache/kafka/pull/9559 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow
chia7712 commented on pull request #9562: URL: https://github.com/apache/kafka/pull/9562#issuecomment-722124176 @kowshik I have merged #9559. Could you rebase PR to trigger QA again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on pull request #8826: URL: https://github.com/apache/kafka/pull/8826#issuecomment-722128511 @junrao Thanks for your response. fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9562: MINOR: Fix param doc in FinalizedFeatureChangeListener.initOrThrow
kowshik commented on pull request #9562: URL: https://github.com/apache/kafka/pull/9562#issuecomment-722130040 @chia7712 I've rebased now on top of #9559. Hopefully the build will go green. Thank you for the review! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10684) Avoid additional copies in envelope path when transmitting over network
Jason Gustafson created KAFKA-10684: --- Summary: Avoid additional copies in envelope path when transmitting over network Key: KAFKA-10684 URL: https://issues.apache.org/jira/browse/KAFKA-10684 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson When we send an envelope request or response, we first allocate a buffer for the embedded data. When we are ready to transmit the data, we allocate a new buffer for the full envelope and copy the embedded data to it. We can skip the second copy if we are a little smarter when translating the envelope data to the network `Send` object. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji opened a new pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji opened a new pull request #9563: URL: https://github.com/apache/kafka/pull/9563 This patch creates a new `SendBuilder` class which allows us to avoid copying "bytes" types when transmitting an api message over the network. This is used in `EnvelopeRequest` and `EnvelopeResponse` to avoid copying the embedded data. The patch also contains a few minor cleanups such as moving envelope parsing logic into `RequestContext`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups getting interpreted as a timezone
Russell Sayers created KAFKA-10685: -- Summary: --to-datetime passed to kafka-consumer-groups getting interpreted as a timezone Key: KAFKA-10685 URL: https://issues.apache.org/jira/browse/KAFKA-10685 Project: Kafka Issue Type: Bug Affects Versions: 2.7.0 Reporter: Russell Sayers If you pass more than 3 decimal places for the fractional seconds of the datetime, the microseconds get interpreted as milliseconds. {{kafka-consumer-groups --bootstrap-server kafka:9092 \}} {{ --reset-offsets \}} {{ --group webserver-avro \}} {{ --topic driver-positions-avro \}} {{ --to-datetime "2020-11-05T00:46:48.002237400" \}} {{ --dry-run}} Relevant code [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. Experimenting with getDateTime: * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the formatting string allows for ZZZ timezones * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends with 123 milliseconds. The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat interprets "000123" as 123 milliseconds. See the stackoverflow answer [here|https://stackoverflow.com/a/21235602/109102]. The fix? Remove any digits after more than 3 characters after the decimal point, or raise an exception. The code would still need to allow the RFC822 timezone, i.e Sign TwoDigitHours Minutes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong
[ https://issues.apache.org/jira/browse/KAFKA-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Russell Sayers updated KAFKA-10685: --- Summary: --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong (was: --to-datetime passed to kafka-consumer-groups getting interpreted as a timezone) > --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong > - > > Key: KAFKA-10685 > URL: https://issues.apache.org/jira/browse/KAFKA-10685 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0 >Reporter: Russell Sayers >Priority: Minor > > If you pass more than 3 decimal places for the fractional seconds of the > datetime, the microseconds get interpreted as milliseconds. > {{kafka-consumer-groups --bootstrap-server kafka:9092 \}} > {{ --reset-offsets \}} > {{ --group webserver-avro \}} > {{ --topic driver-positions-avro \}} > {{ --to-datetime "2020-11-05T00:46:48.002237400" \}} > {{ --dry-run}} > Relevant code > [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. > Experimenting with getDateTime: > * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 > * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the > formatting string allows for ZZZ timezones > * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this > ends with 123 milliseconds. > The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat > interprets "000123" as 123 milliseconds. See the stackoverflow answer > [here|https://stackoverflow.com/a/21235602/109102]. > The fix? Remove any digits after more than 3 characters after the decimal > point, or raise an exception. The code would still need to allow the RFC822 > timezone, i.e Sign TwoDigitHours Minutes. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong
[ https://issues.apache.org/jira/browse/KAFKA-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Russell Sayers updated KAFKA-10685: --- Description: f you pass more than 3 decimal places for the fractional seconds of the datetime, the microseconds get interpreted as milliseconds. {{kafka-consumer-groups --bootstrap-server kafka:9092 \}} {{--reset-offsets \}} {{--group webserver-avro \}} {{--topic driver-positions-avro \}} {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}{{" \}} {{--dry-run}} Relevant code [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. The datetime is being turned into Nov 5, 2020 1:24:05.400 because SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48. Experimenting with getDateTime: * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the formatting string allows for ZZZ timezones * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends with 123 milliseconds. The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat interprets "000123" as 123 milliseconds. See the stackoverflow answer [here|https://stackoverflow.com/a/21235602/109102]. The fix? Remove any digits after more than 3 characters after the decimal point, or raise an exception. The code would still need to allow the RFC822 timezone, i.e Sign TwoDigitHours Minutes. was: If you pass more than 3 decimal places for the fractional seconds of the datetime, the microseconds get interpreted as milliseconds. {{kafka-consumer-groups --bootstrap-server kafka:9092 \}} {{ --reset-offsets \}} {{ --group webserver-avro \}} {{ --topic driver-positions-avro \}} {{ --to-datetime "2020-11-05T00:46:48.002237400" \}} {{ --dry-run}} Relevant code [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. Experimenting with getDateTime: * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the formatting string allows for ZZZ timezones * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends with 123 milliseconds. The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat interprets "000123" as 123 milliseconds. See the stackoverflow answer [here|https://stackoverflow.com/a/21235602/109102]. The fix? Remove any digits after more than 3 characters after the decimal point, or raise an exception. The code would still need to allow the RFC822 timezone, i.e Sign TwoDigitHours Minutes. > --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong > - > > Key: KAFKA-10685 > URL: https://issues.apache.org/jira/browse/KAFKA-10685 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0 >Reporter: Russell Sayers >Priority: Minor > > f you pass more than 3 decimal places for the fractional seconds of the > datetime, the microseconds get interpreted as milliseconds. > {{kafka-consumer-groups --bootstrap-server kafka:9092 \}} > {{--reset-offsets \}} > {{--group webserver-avro \}} > {{--topic driver-positions-avro \}} > {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}{{" \}} > {{--dry-run}} > Relevant code > [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. > The datetime is being turned into Nov 5, 2020 1:24:05.400 because > SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48. > Experimenting with getDateTime: > * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 > * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the > formatting string allows for ZZZ timezones > * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this > ends with 123 milliseconds. > The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat > interprets "000123" as 123 milliseconds. See the stackoverflow answer > [here|https://stackoverflow.com/a/21235602/109102]. > The fix? Remove any digits after more than 3 characters after the decimal > point, or raise an exception. The code would still need to allow the RFC822 > timezone, i.e Sign TwoDigitHours Minutes. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong
[ https://issues.apache.org/jira/browse/KAFKA-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Russell Sayers updated KAFKA-10685: --- Description: If you pass more than 3 decimal places for the fractional seconds of the datetime, the microseconds get interpreted as milliseconds. {{kafka-consumer-groups --bootstrap-server kafka:9092 }} {{--reset-offsets }} {{--group webserver-avro }} {{--topic driver-positions-avro }} {{ {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}" }} {{ {{--dry-run Relevant code [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. The datetime is being turned into Nov 5, 2020 1:24:05.400 because SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48. Experimenting with getDateTime: * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the formatting string allows for ZZZ timezones * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends with 123 milliseconds. The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat interprets "000123" as 123 milliseconds. See the stackoverflow answer [here|https://stackoverflow.com/a/21235602/109102]. The fix? Remove any digits after more than 3 characters after the decimal point, or raise an exception. The code would still need to allow the RFC822 timezone, i.e Sign TwoDigitHours Minutes. was: f you pass more than 3 decimal places for the fractional seconds of the datetime, the microseconds get interpreted as milliseconds. {{kafka-consumer-groups --bootstrap-server kafka:9092 \}} {{--reset-offsets \}} {{--group webserver-avro \}} {{--topic driver-positions-avro \}} {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}{{" \}} {{--dry-run}} Relevant code [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. The datetime is being turned into Nov 5, 2020 1:24:05.400 because SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48. Experimenting with getDateTime: * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the formatting string allows for ZZZ timezones * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this ends with 123 milliseconds. The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat interprets "000123" as 123 milliseconds. See the stackoverflow answer [here|https://stackoverflow.com/a/21235602/109102]. The fix? Remove any digits after more than 3 characters after the decimal point, or raise an exception. The code would still need to allow the RFC822 timezone, i.e Sign TwoDigitHours Minutes. > --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong > - > > Key: KAFKA-10685 > URL: https://issues.apache.org/jira/browse/KAFKA-10685 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0 >Reporter: Russell Sayers >Priority: Minor > > If you pass more than 3 decimal places for the fractional seconds of the > datetime, the microseconds get interpreted as milliseconds. > {{kafka-consumer-groups --bootstrap-server kafka:9092 }} > {{--reset-offsets }} > {{--group webserver-avro }} > {{--topic driver-positions-avro }} > {{ {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}" }} > {{ {{--dry-run > Relevant code > [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. > The datetime is being turned into Nov 5, 2020 1:24:05.400 because > SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48. > Experimenting with getDateTime: > * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 > * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the > formatting string allows for ZZZ timezones > * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this > ends with 123 milliseconds. > The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat > interprets "000123" as 123 milliseconds. See the stackoverflow answer > [here|https://stackoverflow.com/a/21235602/109102]. > The fix? Remove any digits after more than 3 characters after the decimal > point, or raise an exception. The code would still need to allow the RFC822 > timezone, i.e Sign TwoDigitHours Minutes. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226507#comment-17226507 ] Sarita commented on KAFKA-7500: --- Was able to fix the bootstrap-server disconnect issue. Went through all logs and noticed that the sasl mechanism being used by producer/consumer/adminclinet/mirrormakerconnectors was not same. Had to manually set sasl mechanism for each of these configs to fix the issue. Below are the details of worker configs and connector configs. Worker configs are as follows: ``` bootstrap.servers=abc-broker-1:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="abc-broker-superuser" password="abc-broker-superuser-password"; producer.security.protocol=SASL_SSL producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="abc-broker-superuser" password="abc-broker-superuser-password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks ``` Connector configs: ``` { "name": "MM9", "config": { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='xyz-broker-superuser' password='xyz-broker-superuser-password';", "errors.log.include.messages": "true", "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='abc-broker-superuser' password='abc-broker-superuser-password';", "sync.topic.acls.enabled": "false", "tasks.max": "3", "source.cluster.producer.security.protocol": "SASL_SSL", "emit.checkpoints.interval.seconds": "1", "source.cluster.alias": "xyz-broker", "target.cluster.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='abc-broker-superuser' password='abc-broker-superuser-password';", "source.cluster.producer.sasl.mechanism": "PLAIN", "target.cluster.producer.bootstrap.servers": "abc-broker-3:9093,abc-broker-2:9093", "enabled": "true", "target.cluster.admin.bootstrap.servers": "abc-broker-3:9093,abc-broker-2:9093", "target.cluster.producer.security.protocol": "SASL_SSL", "target.cluster.security.protocol": "SASL_SSL", "target.cluster.consumer.sasl.mechanism": "PLAIN", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "errors.log.enable": "true", "source.cluster.admin.bootstrap.servers": "xyz-broker-1:9093,xyz-broker-2:9093", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "clusters": "xyz-broker, abc-broker", "source.cluster.producer.bootstrap.servers": "xyz-broker-1:9093,xyz-broker-2:9093", "target.cluster.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='abc-broker-superuser' password='abc-broker-superuser-password';", "producer.security.protocol": "SASL_SSL", "topics": "messaging_ops_mm8", "source.cluster.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='xyz-broker-superuser' password='xyz-broker-superuser-password';", "target.cluster.sasl.mechanism": "PLAIN", "source.cluster.consumer.security.protocol": "SASL_SSL", "groups": "consumer-group-.*", "source.cluster.consumer.sasl.mechanism": "PLAIN", "source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='xyz-broker-superuser' password='xyz-broker-superuser-password';", "source.cluster.bootstrap.servers": "xyz-broker-1:9093,xyz-broker-2:9093", "source.cluster.sasl.mechanism": "PLAIN", "producer.sasl.mechanism": "PLAIN", "target.cluster.alias": "abc-broker", "target.cluster.consumer.security.protocol": "SASL_SSL", "task.class": "org.apache.kafka.connect.mirror.MirrorSourceTask", "target.cluster.consumer.bootstrap.servers": "abc-broker-3:9093,abc-broker-2:9093", "name": "MM9", "target.cluster.bootstrap.servers": "abc-broker-3:9093,abc-broker-2:9093", "emit.hear
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651 ] Sarita edited comment on KAFKA-7500 at 11/5/20, 6:11 AM: - Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows {{{quote}}} bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test offset.flush.interval.ms=30 producer.buffer.memory=1234 producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks {{{quote}}} When i run below command, I can see worker connector created: {{{quote}}} ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties &``` {{{quote}}} When I try to create a connector using POST call, I start seeing message as {{{quote}}} ```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker abc-broker:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)``` {{{quote}}} json content for POST call is as below {{{quote}}} ``` {{ "name": "MM9", "config": { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "tasks.max": 3, "topics": "messaging_ops_mm8", "errors.log.enable": true, "errors.log.include.messages": true, "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "clusters": "xyz-broker, abc-broker", "source.cluster.alias": "xyz-broker", "target.cluster.alias": "abc-broker", "source.cluster.bootstrap.servers": "xyz-broker:9093", "source.cluster.security.protocol": "SASL_SSL", "source.cluster.sasl.mechanism": "PLAIN", "source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='superuser' password='superuser_password';", "source.cluster.ssl.truststore.password" : "truststore_password", "source.cluster.ssl.truststore.location" : "/opt/projects/confluent/wildcard.kafka.iggroup.local.jks", "source.cluster.ssl.keystore.password" : "keystore_password", "source.cluster.ssl.keystore.location" : "/opt/projects/confluent/ssl/wildcard.kafka.iggroup.local.jks", "target.cluster.bootstrap.servers": "abc-broker:9093", "target.cluster.security.protocol": "SASL_SSL", "target.cluster.sasl.mechanism": "PLAIN", "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='superuser' password='superuser_password';", "target.cluster.ssl.truststore.password" : "truststore_password", "target.cluster.ssl.truststore.location" : "/opt/projects/confluent/ssl/wildcard.kafka.iggroup.local.jks", "target.cluster.ssl.keystore.password" : "keystore_password", "target.cluster.ssl.keystore.location" : "/opt/projects/confluent/ssl/wildcard.kafka.iggroup.local.jks" } } ``` {{{quote}}} Note the bootstrap-server added in connect-distributed.properties file and the target cluster bootstrap-server are same(abc-broker). I have added all the SASL credentials, did a swap of source and destination cluster in the json but I continue to get this broker disconnected WARNING. We are on kafka version 0.11.0.3. What is it that we are missing? was (Author: saritago): Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows ``` bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apac
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651 ] Sarita edited comment on KAFKA-7500 at 11/5/20, 6:12 AM: - Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows {{}} {quote} {{_emphasis_}} bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test offset.flush.interval.ms=30 producer.buffer.memory=1234 producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks {{_emphasis_}} {quote} When i run below command, I can see worker connector created: {quote}{{ {{ ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties &``` }} {{ {quote} When I try to create a connector using POST call, I start seeing message as {quote}{{ ```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker abc-broker:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)``` {{ {quote} json content for POST call is as below {quote}{{ {{ ```}} {{ {{ "name": "MM9",}} {{ "config":}} Unknown macro: \{ "connector.class"} } ``` {{ {quote} Note the bootstrap-server added in connect-distributed.properties file and the target cluster bootstrap-server are same(abc-broker). I have added all the SASL credentials, did a swap of source and destination cluster in the json but I continue to get this broker disconnected WARNING. We are on kafka version 0.11.0.3. What is it that we are missing? was (Author: saritago): Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows {{{quote}}} bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test offset.flush.interval.ms=30 producer.buffer.memory=1234 producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks {{{quote}}} When i run below command, I can see worker connector created: {{{quote}}} ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties &``` {{{quote}}} When I try to create a connector using POST call, I start seeing message as {{{quote}}} ```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker abc-broker:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)``` {{{quote}}} json content for POST call is as below {{{quote}}} ``` {{ "name": "MM9", "config": { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "tasks.max": 3, "topics": "messaging_ops_mm8", "errors.log.enable": true, "errors.log.include.messages": true, "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "clusters": "xyz-broker, abc-broker", "source.cluster.alias": "xyz-broker", "target.cluster.alias": "abc-broker", "source.cluster.bootstrap.servers": "xyz-broker:90
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651 ] Sarita edited comment on KAFKA-7500 at 11/5/20, 6:13 AM: - Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows {{{quote}}} bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test offset.flush.interval.ms=30 producer.buffer.memory=1234 producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks {{{quote}}} When i run below command, I can see worker connector created: {quote}{{ \{{ ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties &``` }} {{ {quote} When I try to create a connector using POST call, I start seeing message as {quote}{{ ```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker abc-broker:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)``` {{ {quote} json content for POST call is as below {quote}{{ \{{ ```}} {{ {{ "name": "MM9",}} \{{ "config":}} Unknown macro: \{ "connector.class"} } ``` {{ {quote} Note the bootstrap-server added in connect-distributed.properties file and the target cluster bootstrap-server are same(abc-broker). I have added all the SASL credentials, did a swap of source and destination cluster in the json but I continue to get this broker disconnected WARNING. We are on kafka version 0.11.0.3. What is it that we are missing? was (Author: saritago): Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows {{}} {quote} {{_emphasis_}} bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test offset.flush.interval.ms=30 producer.buffer.memory=1234 producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks {{_emphasis_}} {quote} When i run below command, I can see worker connector created: {quote}{{ {{ ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties &``` }} {{ {quote} When I try to create a connector using POST call, I start seeing message as {quote}{{ ```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker abc-broker:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)``` {{ {quote} json content for POST call is as below {quote}{{ {{ ```}} {{ {{ "name": "MM9",}} {{ "config":}} Unknown macro: \{ "connector.class"} } ``` {{ {quote} Note the bootstrap-server added in connect-distributed.properties file and the target cluster bootstrap-server are same(abc-broker). I have added all the SASL credentials, did a swap of source and destination cluster in the json but I continue to get this broker disconnected WARNING. We are on kafka version 0.11.0.3. What is it that we are missing? > MirrorMaker 2.0 (KIP-382) > ---
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651 ] Sarita edited comment on KAFKA-7500 at 11/5/20, 6:13 AM: - Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows {quote}quote bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test offset.flush.interval.ms=30 producer.buffer.memory=1234 producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks quote {quote} When i run below command, I can see worker connector created: {quote}{{ \{{ ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties &``` }} {{ {quote} When I try to create a connector using POST call, I start seeing message as {quote}{{ ```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker abc-broker:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)``` {{ {quote} json content for POST call is as below {quote}{{ \{{ ```}} {{ {{ "name": "MM9",}} \{{ "config":}} Unknown macro: \{ "connector.class"} } ``` {{ {quote} Note the bootstrap-server added in connect-distributed.properties file and the target cluster bootstrap-server are same(abc-broker). I have added all the SASL credentials, did a swap of source and destination cluster in the json but I continue to get this broker disconnected WARNING. We are on kafka version 0.11.0.3. What is it that we are missing? was (Author: saritago): Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows {{{quote}}} bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test offset.flush.interval.ms=30 producer.buffer.memory=1234 producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks {{{quote}}} When i run below command, I can see worker connector created: {quote}{{ \{{ ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties &``` }} {{ {quote} When I try to create a connector using POST call, I start seeing message as {quote}{{ ```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker abc-broker:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)``` {{ {quote} json content for POST call is as below {quote}{{ \{{ ```}} {{ {{ "name": "MM9",}} \{{ "config":}} Unknown macro: \{ "connector.class"} } ``` {{ {quote} Note the bootstrap-server added in connect-distributed.properties file and the target cluster bootstrap-server are same(abc-broker). I have added all the SASL credentials, did a swap of source and destination cluster in the json but I continue to get this broker disconnected WARNING. We are on kafka version 0.11.0.3. What is it that we are missing? > MirrorMaker 2.0 (KIP-382) > -
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224651#comment-17224651 ] Sarita edited comment on KAFKA-7500 at 11/5/20, 6:24 AM: - Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setup. Worker configs are as follows {quote}quote bootstrap.servers=abc-broker:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="superuser_password"; group.id=connect-tails key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets-test config.storage.topic=connect-configs-test status.storage.topic=connect-status-test offset.flush.interval.ms=30 producer.buffer.memory=1234 producer.ssl.truststore.password=truststore_password producer.ssl.truststore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks producer.ssl.keystore.password=keystore_password producer.ssl.keystore.location=/opt/projects/confluent/wildcard.kafka.iggroup.local.jks quote {quote} When i run below command, I can see worker connector created: {quote}{{ \{{ ```nohup sh connect-distributed ../etc/kafka/connect-distributed-1.properties &``` }} {{ {quote} When I try to create a connector using POST call, I start seeing message as {quote}{{ ```WARN [Producer clientId=connector-producer-MM9-0] Bootstrap broker abc-broker:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)``` {{ {quote} json content for POST call is as below {quote}{{ "name": "MM9", "config": { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "tasks.max": 3, "topics": "messaging_ops_mm8", "errors.log.enable": true, "errors.log.include.messages": true, "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "clusters": "peach, tails", "source.cluster.alias": "peach", "target.cluster.alias": "tails", "source.cluster.bootstrap.servers": "xyz-broker:9093", "source.cluster.producer.bootstrap.servers": "xyz-broker:9093", "sync.topic.acls.enabled": "false", "emit.checkpoints.interval.seconds": "1", "groups": "consumer-group-.*", "source.cluster.admin.bootstrap.servers": "xyz-broker:9093", "source.cluster.consumer.bootstrap.servers": "xyz-broker:9093", "source.cluster.security.protocol": "SASL_SSL", "source.cluster.sasl.mechanism": "PLAIN", "source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='superuser' password='superuser-password';", "source.cluster.ssl.truststore.password" : "truststorepassword", "source.cluster.ssl.truststore.location" : "/opt/projects/confluent/wildcard.kafka.iggroup.local.jks", "source.cluster.ssl.keystore.password" : "keystorepassword", "source.cluster.ssl.keystore.location" : "/opt/projects/confluent/wildcard.kafka.iggroup.local.jks", "target.cluster.bootstrap.servers": "abc-broker:9093", "target.cluster.producer.bootstrap.servers": "abc-broker:9093", "enabled": "true", "target.cluster.admin.bootstrap.servers": "abc-broker:9093", "target.cluster.consumer.bootstrap.servers": "abc-broker:9093", "name": "MirrorSourceConnector", "emit.heartbeats.interval.seconds": "1", "target.cluster.security.protocol": "SASL_SSL", "target.cluster.sasl.mechanism": "PLAIN", "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='superuser' password='superuser-password';", "target.cluster.ssl.truststore.password" : "truststorepassword", "target.cluster.ssl.truststore.location" : "/opt/projects/confluent/wildcard.kafka.iggroup.local.jks", "target.cluster.ssl.keystore.password" : "keystore-password", "target.cluster.ssl.keystore.location" : "/opt/projects/confluent/wildcard.kafka.iggroup.local.jks" }} {quote} Note the bootstrap-server added in connect-distributed.properties file and the target cluster bootstrap-server are same(abc-broker). I have added all the SASL credentials, did a swap of source and destination cluster in the json but I continue to get this broker disconnected WARNING. We are on kafka version 0.11.0.3. What is it that we are missing? was (Author: saritago): Hi [~ryannedolan] We are trying to set up MM2 with connection distributed setu
[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
chia7712 commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r517822478 ## File path: clients/src/main/resources/common/message/EnvelopeRequest.json ## @@ -23,7 +23,7 @@ "fields": [ { "name": "RequestData", "type": "bytes", "versions": "0+", "zeroCopy": true, "about": "The embedded request header and data."}, -{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", "zeroCopy": true, "nullableVersions": "0+", +{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", "nullableVersions": "0+", Review comment: Is ```nullableVersions`` required? It seems there is no null handle/check in production for this field. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
chia7712 commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r517824324 ## File path: clients/src/main/java/org/apache/kafka/common/network/SendBuilder.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.ByteUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * This class provides a way to build {@link Send} objects for network + * transmission from generated {@link org.apache.kafka.common.protocol.ApiMessage} + * types. Its main advantage over direct {@link ByteBuffer} allocation based on + * {@link org.apache.kafka.common.protocol.ApiMessage#size(ObjectSerializationCache, short)} + * is that it avoids copying "bytes" fields. The downside is that it is up to the caller + * to allocate a buffer which accounts only for the additional request overhead. + * + * See {@link org.apache.kafka.common.requests.EnvelopeRequest#toSend(String, RequestHeader)} + * for example usage. + */ +public class SendBuilder implements Writable { +private final List buffers = new ArrayList<>(); +private final ByteBuffer buffer; + +public SendBuilder(ByteBuffer buffer) { Review comment: The ```buffer``` is used/owned by ```SendBuilder`` only. It seems to me this constructor should accept ```capacity(int type)``` rather than ```ByteBuffer```. (SendBuilder should create ```ByteBuffer``` in construction) ## File path: clients/src/main/java/org/apache/kafka/common/network/SendBuilder.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.ByteUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * This class provides a way to build {@link Send} objects for network + * transmission from generated {@link org.apache.kafka.common.protocol.ApiMessage} + * types. Its main advantage over direct {@link ByteBuffer} allocation based on + * {@link org.apache.kafka.common.protocol.ApiMessage#size(ObjectSerializationCache, short)} + * is that it avoids copying "bytes" fields. The downside is that it is up to the caller + * to allocate a buffer which accounts only for the additional request overhead. + * + * See {@link org.apache.kafka.common.requests.EnvelopeRequest#toSend(String, RequestHeader)} + * for example usage. + */ +public class SendBuilder implements Writable { +private final List buffers = new ArrayList<>(); +private final ByteBuffer buffer; + +public SendBuilder(ByteBuffer buffer) { Review comment: The ```buffer``` is used/owned by ```SendBuilder``` only. It seems to me this constructor should accept ```capacity(int type)``` rather than ```ByteBuffer```. (SendBuilder should create ```ByteBuffer``` in construction) This is an automated message from the Apache Git Service. To respond to th
[GitHub] [kafka] chia7712 commented on pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
chia7712 commented on pull request #9563: URL: https://github.com/apache/kafka/pull/9563#issuecomment-722184706 Does this improvement work for other requests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org