[jira] [Commented] (KAFKA-6972) Kafka ACL does not work expected with wildcard
[ https://issues.apache.org/jira/browse/KAFKA-6972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499338#comment-16499338 ] Soyee Deng commented on KAFKA-6972: --- Hi [~sliebau], Sorry, my fault. It works as you mentioned. Thanks for helping me out. > Kafka ACL does not work expected with wildcard > -- > > Key: KAFKA-6972 > URL: https://issues.apache.org/jira/browse/KAFKA-6972 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.11.0.0 > Environment: OS : CentOS 7, 64bit. > Confluent : 3.3, Kafka 0.11. >Reporter: Soyee Deng >Assignee: Sönke Liebau >Priority: Major > > Just started with Confluent 3.3 platform and Kafka 0.11 having SSL as > transportation security and Kerberos to restrict the access control based on > the holding principals. In order to make life easier, wildcard is extensively > used in my environment. But it turned out that is not working as expected. > My issue is that when I run the command _kafka-acls_ under one directory with > some files, this command would pick up the name of first file as the topic > name or group name. e.g. In my case, abcd.txt would be chosen while giving my > principal connect-consumer the permissions of consuming message from any > topic with any group Id. > [quality@data-pipeline-1 test_dir]$ > KAFKA_OPTS=-Djava.security.auth.login.config='/etc/security/jaas/broker-jaas.conf' > kafka-acls --authorizer-properties > zookeeper.connect=data-pipeline-1.orion.com:2181 --add --allow-principal > User:connect-consumer --consumer --topic * --group * > Adding ACLs for resource `Topic:abcd.txt`: > User:connect-consumer has Allow permission for operations: Describe from > hosts: * > User:connect-consumer has Allow permission for operations: Read from hosts: * > Adding ACLs for resource `Group:abcd.txt`: > User:connect-consumer has Allow permission for operations: Read from hosts: * > Current ACLs for resource `Topic:abcd.txt`: > User:connect-consumer has Allow permission for operations: Describe from > hosts: * > User:connect-consumer has Allow permission for operations: Read from hosts: * > User:connect-consumer has Allow permission for operations: Write from hosts: > * > Current ACLs for resource `Group:abcd.txt`: > User:connect-consumer has Allow permission for operations: Read from hosts: * > > My current work around solution is changing command context to an empty > directory and run above command, it works as expected. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5523) ReplayLogProducer not using the new Kafka consumer
[ https://issues.apache.org/jira/browse/KAFKA-5523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499343#comment-16499343 ] ASF GitHub Bot commented on KAFKA-5523: --- ijuma closed pull request #5092: KAFKA-5523: Remove ReplayLogProducer tool URL: https://github.com/apache/kafka/pull/5092 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-replay-log-producer.sh b/bin/kafka-replay-log-producer.sh deleted file mode 100755 index bba3241d75f..000 --- a/bin/kafka-replay-log-producer.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplayLogProducer "$@" diff --git a/bin/windows/kafka-replay-log-producer.bat b/bin/windows/kafka-replay-log-producer.bat deleted file mode 100644 index 7b51302a005..000 --- a/bin/windows/kafka-replay-log-producer.bat +++ /dev/null @@ -1,17 +0,0 @@ -@echo off -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. - -"%~dp0kafka-run-class.bat" kafka.tools.ReplayLogProducer %* diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala deleted file mode 100644 index ca9c63a..000 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import joptsimple.OptionParser -import java.util.concurrent.CountDownLatch -import java.util.Properties -import kafka.consumer._ -import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils} -import kafka.api.OffsetRequest -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} -import scala.collection.JavaConverters._ - -object ReplayLogProducer extends Logging { - - private val GroupId: String = "replay-log-producer" - - def main(args: Array[String]) { -val config = new Config(args) - -// if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack -ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId) -Thread.sleep(500) - -// consumer properties -val consumerProps = new Properties -consumerProps.put("group.id", GroupId) -consumerProps.put("zookeeper.connect", config.zkConnect) -consumerProps.put("consumer.timeout.ms", "1") -consumerProp
[jira] [Comment Edited] (KAFKA-6972) Kafka ACL does not work expected with wildcard
[ https://issues.apache.org/jira/browse/KAFKA-6972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499338#comment-16499338 ] Soyee Deng edited comment on KAFKA-6972 at 6/3/18 8:58 AM: --- Hi [~sliebau], Sorry, my fault. It works as you mentioned. Thanks for helping me out. Probably it is better to state the differences between plain and quoted one in documentation. was (Author: soyee): Hi [~sliebau], Sorry, my fault. It works as you mentioned. Thanks for helping me out. > Kafka ACL does not work expected with wildcard > -- > > Key: KAFKA-6972 > URL: https://issues.apache.org/jira/browse/KAFKA-6972 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.11.0.0 > Environment: OS : CentOS 7, 64bit. > Confluent : 3.3, Kafka 0.11. >Reporter: Soyee Deng >Assignee: Sönke Liebau >Priority: Major > > Just started with Confluent 3.3 platform and Kafka 0.11 having SSL as > transportation security and Kerberos to restrict the access control based on > the holding principals. In order to make life easier, wildcard is extensively > used in my environment. But it turned out that is not working as expected. > My issue is that when I run the command _kafka-acls_ under one directory with > some files, this command would pick up the name of first file as the topic > name or group name. e.g. In my case, abcd.txt would be chosen while giving my > principal connect-consumer the permissions of consuming message from any > topic with any group Id. > [quality@data-pipeline-1 test_dir]$ > KAFKA_OPTS=-Djava.security.auth.login.config='/etc/security/jaas/broker-jaas.conf' > kafka-acls --authorizer-properties > zookeeper.connect=data-pipeline-1.orion.com:2181 --add --allow-principal > User:connect-consumer --consumer --topic * --group * > Adding ACLs for resource `Topic:abcd.txt`: > User:connect-consumer has Allow permission for operations: Describe from > hosts: * > User:connect-consumer has Allow permission for operations: Read from hosts: * > Adding ACLs for resource `Group:abcd.txt`: > User:connect-consumer has Allow permission for operations: Read from hosts: * > Current ACLs for resource `Topic:abcd.txt`: > User:connect-consumer has Allow permission for operations: Describe from > hosts: * > User:connect-consumer has Allow permission for operations: Read from hosts: * > User:connect-consumer has Allow permission for operations: Write from hosts: > * > Current ACLs for resource `Group:abcd.txt`: > User:connect-consumer has Allow permission for operations: Read from hosts: * > > My current work around solution is changing command context to an empty > directory and run above command, it works as expected. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6976) Kafka Streams instances going in to DEAD state
[ https://issues.apache.org/jira/browse/KAFKA-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499347#comment-16499347 ] Deepak Goyal commented on KAFKA-6976: - [^kafkaStreamsDeadState.log] The logs attached are DEBUG level. At line number 64017, you'll be able to see the client going into dead state. Also, please note that logs are from an instance which served as an application leader for the Kafka-Streams. > Kafka Streams instances going in to DEAD state > -- > > Key: KAFKA-6976 > URL: https://issues.apache.org/jira/browse/KAFKA-6976 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Deepak Goyal >Priority: Blocker > > We are using Kafka 0.10.2.0, Kafka-Streams 1.1.0. We have Kafka Cluster of 16 > machines, and topic that is being consumed by Kafka Streams has 256 > partitions. We spawned 400 machines of Kakfa Streams application. We see that > all of the StreamThreads go in to DEAD state. > {quote}{{[2018-05-25 05:59:29,282] INFO stream-thread > [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7-StreamThread-1] State transition > from PENDING_SHUTDOWN to DEAD > (org.apache.kafka.streams.processor.internals.StreamThread) [2018-05-25 > 05:59:29,282] INFO stream-client [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7] > State transition from REBALANCING to ERROR > (org.apache.kafka.streams.KafkaStreams) [2018-05-25 05:59:29,282] WARN > stream-client [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7] All stream threads > have died. The instance will be in error state and should be closed. > (org.apache.kafka.streams.KafkaStreams) [2018-05-25 05:59:29,282] INFO > stream-thread [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7-StreamThread-1] > Shutdown complete > (org.apache.kafka.streams.processor.internals.StreamThread)}} > {quote} > Please note that when we only have 100 kafka-streams application machines, > things are working as expected. We see that instances are consuming messages > from topic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6976) Kafka Streams instances going in to DEAD state
[ https://issues.apache.org/jira/browse/KAFKA-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak Goyal updated KAFKA-6976: Attachment: kafkaStreamsDeadState.log > Kafka Streams instances going in to DEAD state > -- > > Key: KAFKA-6976 > URL: https://issues.apache.org/jira/browse/KAFKA-6976 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Deepak Goyal >Priority: Blocker > Attachments: kafkaStreamsDeadState.log > > > We are using Kafka 0.10.2.0, Kafka-Streams 1.1.0. We have Kafka Cluster of 16 > machines, and topic that is being consumed by Kafka Streams has 256 > partitions. We spawned 400 machines of Kakfa Streams application. We see that > all of the StreamThreads go in to DEAD state. > {quote}{{[2018-05-25 05:59:29,282] INFO stream-thread > [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7-StreamThread-1] State transition > from PENDING_SHUTDOWN to DEAD > (org.apache.kafka.streams.processor.internals.StreamThread) [2018-05-25 > 05:59:29,282] INFO stream-client [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7] > State transition from REBALANCING to ERROR > (org.apache.kafka.streams.KafkaStreams) [2018-05-25 05:59:29,282] WARN > stream-client [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7] All stream threads > have died. The instance will be in error state and should be closed. > (org.apache.kafka.streams.KafkaStreams) [2018-05-25 05:59:29,282] INFO > stream-thread [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7-StreamThread-1] > Shutdown complete > (org.apache.kafka.streams.processor.internals.StreamThread)}} > {quote} > Please note that when we only have 100 kafka-streams application machines, > things are working as expected. We see that instances are consuming messages > from topic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6925) Memory leak in org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
[ https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499505#comment-16499505 ] ASF GitHub Bot commented on KAFKA-6925: --- guozhangwang closed pull request #5120: KAFKA-6925: fix parentSensors memory leak (#5108) URL: https://github.com/apache/kafka/pull/5120 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index ea85b749f88..7f269e04c4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -216,6 +216,7 @@ public void removeSensor(Sensor sensor) { parent = parentSensors.get(sensor); if (parent != null) { metrics.removeSensor(parent.name()); +parentSensors.remove(sensor); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java index 0e87a6d3858..0f91ae15529 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java @@ -19,11 +19,15 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -53,19 +57,27 @@ public void testRemoveSensor() { String entity = "entity"; String operation = "put"; Map tags = new HashMap<>(); -StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags); +final Metrics metrics = new Metrics(); +final Map initialMetrics = Collections.unmodifiableMap(new LinkedHashMap<>(metrics.metrics())); +StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, groupName, tags); Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor1); +Assert.assertEquals(initialMetrics, metrics.metrics()); Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG, sensor1); streamsMetrics.removeSensor(sensor1a); +Assert.assertEquals(initialMetrics, metrics.metrics()); Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor2); +Assert.assertEquals(initialMetrics, metrics.metrics()); Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor3); +Assert.assertEquals(initialMetrics, metrics.metrics()); + +Assert.assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors); } @Test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Memory leak in > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > - > > Key: KAFKA-6925 > URL: https://issues.apache.org/jira/browse/KAFKA-6925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Marcin Kuthan >Assignee: John Roesler >Priority: Major > Fix For: 1.1.1 > > > *Note: this issue was fixed incidentally in 2.0, so it is only present in > versions 0.x and 1.x.* > > The retained heap of > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > is surprisingly high for long running job. Over 100MB of heap for every > stream a
[jira] [Commented] (KAFKA-6925) Memory leak in org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
[ https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499506#comment-16499506 ] Guozhang Wang commented on KAFKA-6925: -- Merged separate PRs for 0.11.0, 1.0 and 1.1 > Memory leak in > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > - > > Key: KAFKA-6925 > URL: https://issues.apache.org/jira/browse/KAFKA-6925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Marcin Kuthan >Assignee: John Roesler >Priority: Major > Fix For: 0.11.0.3, 1.0.2, 1.1.1 > > > *Note: this issue was fixed incidentally in 2.0, so it is only present in > versions 0.x and 1.x.* > > The retained heap of > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > is surprisingly high for long running job. Over 100MB of heap for every > stream after a week of uptime, when for the same application a few hours > after start heap takes 2MB. > For the problematic instance majority of memory StreamsMetricsThreadImpl is > occupied by hash map entries in parentSensors, over 8000 elements 100+kB > each. For fresh instance there are less than 200 elements. > Below you could find retained set report generated from Eclipse Mat but I'm > not fully sure about correctness due to complex object graph in the metrics > related code. Number of objects in single > StreamThread$StreamsMetricsThreadImpl instance. > > {code:java} > Class Name | Objects | Shallow Heap > --- > org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232 > org.apache.kafka.common.MetricName | 140,476 | 4,495,232 > org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752 > org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328 > org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328 > org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496 > org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496 > org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288 > org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288 > org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496 > org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088 > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl| > 1 | 56 > --- > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6925) Memory leak in org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
[ https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6925: - Fix Version/s: 1.0.2 0.11.0.3 > Memory leak in > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > - > > Key: KAFKA-6925 > URL: https://issues.apache.org/jira/browse/KAFKA-6925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Marcin Kuthan >Assignee: John Roesler >Priority: Major > Fix For: 0.11.0.3, 1.0.2, 1.1.1 > > > *Note: this issue was fixed incidentally in 2.0, so it is only present in > versions 0.x and 1.x.* > > The retained heap of > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > is surprisingly high for long running job. Over 100MB of heap for every > stream after a week of uptime, when for the same application a few hours > after start heap takes 2MB. > For the problematic instance majority of memory StreamsMetricsThreadImpl is > occupied by hash map entries in parentSensors, over 8000 elements 100+kB > each. For fresh instance there are less than 200 elements. > Below you could find retained set report generated from Eclipse Mat but I'm > not fully sure about correctness due to complex object graph in the metrics > related code. Number of objects in single > StreamThread$StreamsMetricsThreadImpl instance. > > {code:java} > Class Name | Objects | Shallow Heap > --- > org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232 > org.apache.kafka.common.MetricName | 140,476 | 4,495,232 > org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752 > org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328 > org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328 > org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496 > org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496 > org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288 > org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288 > org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496 > org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088 > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl| > 1 | 56 > --- > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6925) Memory leak in org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
[ https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6925. -- Resolution: Fixed > Memory leak in > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > - > > Key: KAFKA-6925 > URL: https://issues.apache.org/jira/browse/KAFKA-6925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Marcin Kuthan >Assignee: John Roesler >Priority: Major > Fix For: 0.11.0.3, 1.0.2, 1.1.1 > > > *Note: this issue was fixed incidentally in 2.0, so it is only present in > versions 0.x and 1.x.* > > The retained heap of > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > is surprisingly high for long running job. Over 100MB of heap for every > stream after a week of uptime, when for the same application a few hours > after start heap takes 2MB. > For the problematic instance majority of memory StreamsMetricsThreadImpl is > occupied by hash map entries in parentSensors, over 8000 elements 100+kB > each. For fresh instance there are less than 200 elements. > Below you could find retained set report generated from Eclipse Mat but I'm > not fully sure about correctness due to complex object graph in the metrics > related code. Number of objects in single > StreamThread$StreamsMetricsThreadImpl instance. > > {code:java} > Class Name | Objects | Shallow Heap > --- > org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232 > org.apache.kafka.common.MetricName | 140,476 | 4,495,232 > org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752 > org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328 > org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328 > org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496 > org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496 > org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288 > org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288 > org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496 > org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088 > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl| > 1 | 56 > --- > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6900) Add thenCompose to KafkaFuture
[ https://issues.apache.org/jira/browse/KAFKA-6900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499565#comment-16499565 ] ASF GitHub Bot commented on KAFKA-6900: --- RichoDemus closed pull request #5080: KAFKA-6900: Add thenCompose to KafkaFuture URL: https://github.com/apache/kafka/pull/5080 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java index 9cd2e01dc42..d7ee5453e39 100644 --- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java +++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java @@ -115,6 +115,15 @@ private void maybeComplete() { */ public abstract KafkaFuture thenApply(BaseFunction function); +/** + * Returns a new KafkaFuture that, when this future completes normally, is executed with this + * futures's result as the argument to the supplied function. + * + * The function may be invoked by the thread that calls {@code thenApply} or it may be invoked by the thread that + * completes the future. + */ +public abstract KafkaFuture thenCompose(BaseFunction> function); + /** * @see KafkaFuture#thenApply(BaseFunction) * diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java index 33916ac952a..f534fe3de97 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.internals; +import org.apache.kafka.common.KafkaFuture; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; @@ -23,8 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.kafka.common.KafkaFuture; - /** * A flexible future which supports call chaining and other asynchronous programming patterns. * This will eventually become a thin shim on top of Java 8's CompletableFuture. @@ -70,6 +70,35 @@ public void accept(A a, Throwable exception) { } } +private static class FutureApplicant implements BiConsumer { +private final BaseFunction> function; +private final KafkaFutureImpl future; + +FutureApplicant(BaseFunction> function, KafkaFutureImpl future) { +this.function = function; +this.future = future; +} + +@Override +public void accept(A a, Throwable exception) { +if (exception != null) { +future.completeExceptionally(exception); +} else { +final KafkaFuture b = function.apply(a); +b.whenComplete(new BiConsumer() { +@Override +public void accept(B result, Throwable error) { +if (error != null) { +future.completeExceptionally(error); +} else { +future.complete(result); +} +} +}); +} +} +} + private static class SingleWaiter implements BiConsumer { private R value = null; private Throwable exception = null; @@ -146,6 +175,13 @@ R await(long timeout, TimeUnit unit) return future; } +@Override +public KafkaFuture thenCompose(BaseFunction> function) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +addWaiter(new FutureApplicant<>(function, future)); +return future; +} + public void copyWith(KafkaFuture future, BaseFunction function) { KafkaFutureImpl futureImpl = (KafkaFutureImpl) future; futureImpl.addWaiter(new Applicant<>(function, this)); diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java index 6f9efca7c66..ebc8ab30d61 100644 --- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java @@ -27,10 +27,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; +import s
[jira] [Updated] (KAFKA-6841) Add support for Prefixed ACLs
[ https://issues.apache.org/jira/browse/KAFKA-6841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piyush Vijay updated KAFKA-6841: Summary: Add support for Prefixed ACLs (was: Add support for wildcard suffixed ACLs) > Add support for Prefixed ACLs > - > > Key: KAFKA-6841 > URL: https://issues.apache.org/jira/browse/KAFKA-6841 > Project: Kafka > Issue Type: New Feature > Components: admin, security >Reporter: Piyush Vijay >Priority: Major > Fix For: 2.0.0 > > > Kafka supports authorize access to resources like topics, consumer groups > etc. by way of ACLs. The current supported semantic of resource name and > principal name in ACL definition is either full resource/principal name or > special wildcard '**'*, which matches everything. > Kafka should support a way of defining bulk ACLs instead of specifying > individual ACLs. > The details for the feature are available here - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+wildcard+suffixed+ACLs] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6709) broker failed to handle request due to OOM
[ https://issues.apache.org/jira/browse/KAFKA-6709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499656#comment-16499656 ] Dhruvil Shah commented on KAFKA-6709: - Fixed by KAFKA-6927. > broker failed to handle request due to OOM > -- > > Key: KAFKA-6709 > URL: https://issues.apache.org/jira/browse/KAFKA-6709 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.1 >Reporter: Zou Tao >Assignee: Dhruvil Shah >Priority: Critical > Fix For: 2.0.0 > > Attachments: kafkaServer-gc.log.0.current.zip, kafkaServer.out.tgz, > normal-kafkaServer-gc.log.0.current.zip, server.properties > > > I have updated to release 1.0.1. > I set up cluster which have four brokers. > you could find the server.properties in the attachment. > There are about 150 topics, and about total 4000 partitions, > ReplicationFactor is 2. > connctors are used to write/read data to/from brokers. > connecotr version is 0.10.1. > The average message size is 500B, and around 6 messages per seconds. > one of the broker keep report OOM, and can't handle request like: > [2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request > {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[ > {partition=16,fetch_offset=51198,max_bytes=60728640} > ,\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} > (kafka.server.KafkaApis) > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101) > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523) > at scala.Option.map(Option.scala:146) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513) > at scala.Option.flatMap(Option.scala:171) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041) > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54) > at > kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593) > at > kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592) > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609) > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609) > at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601) > at kafka.server.KafkaApis.handle(KafkaApis.scala:99) > and then lots of shrink ISR ( this broker is 1001). > [2018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 > broke
[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently
[ https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499728#comment-16499728 ] Valencia Edna Serrao commented on KAFKA-6335: - Hi [~yuzhih...@gmail.com], I've been tracking this Jira for quite some time now, the failure is no longer occurring on my computer. > SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails > intermittently > -- > > Key: KAFKA-6335 > URL: https://issues.apache.org/jira/browse/KAFKA-6335 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Manikumar >Priority: Major > > From > https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/ > : > {code} > java.lang.AssertionError: expected acls Set(User:36 has Allow permission for > operations: Read from hosts: *, User:7 has Allow permission for operations: > Read from hosts: *, User:21 has Allow permission for operations: Read from > hosts: *, User:39 has Allow permission for operations: Read from hosts: *, > User:43 has Allow permission for operations: Read from hosts: *, User:3 has > Allow permission for operations: Read from hosts: *, User:35 has Allow > permission for operations: Read from hosts: *, User:15 has Allow permission > for operations: Read from hosts: *, User:16 has Allow permission for > operations: Read from hosts: *, User:22 has Allow permission for operations: > Read from hosts: *, User:26 has Allow permission for operations: Read from > hosts: *, User:11 has Allow permission for operations: Read from hosts: *, > User:38 has Allow permission for operations: Read from hosts: *, User:8 has > Allow permission for operations: Read from hosts: *, User:28 has Allow > permission for operations: Read from hosts: *, User:32 has Allow permission > for operations: Read from hosts: *, User:25 has Allow permission for > operations: Read from hosts: *, User:41 has Allow permission for operations: > Read from hosts: *, User:44 has Allow permission for operations: Read from > hosts: *, User:48 has Allow permission for operations: Read from hosts: *, > User:2 has Allow permission for operations: Read from hosts: *, User:9 has > Allow permission for operations: Read from hosts: *, User:14 has Allow > permission for operations: Read from hosts: *, User:46 has Allow permission > for operations: Read from hosts: *, User:13 has Allow permission for > operations: Read from hosts: *, User:5 has Allow permission for operations: > Read from hosts: *, User:29 has Allow permission for operations: Read from > hosts: *, User:45 has Allow permission for operations: Read from hosts: *, > User:6 has Allow permission for operations: Read from hosts: *, User:37 has > Allow permission for operations: Read from hosts: *, User:23 has Allow > permission for operations: Read from hosts: *, User:19 has Allow permission > for operations: Read from hosts: *, User:24 has Allow permission for > operations: Read from hosts: *, User:17 has Allow permission for operations: > Read from hosts: *, User:34 has Allow permission for operations: Read from > hosts: *, User:12 has Allow permission for operations: Read from hosts: *, > User:42 has Allow permission for operations: Read from hosts: *, User:4 has > Allow permission for operations: Read from hosts: *, User:47 has Allow > permission for operations: Read from hosts: *, User:18 has Allow permission > for operations: Read from hosts: *, User:31 has Allow permission for > operations: Read from hosts: *, User:49 has Allow permission for operations: > Read from hosts: *, User:33 has Allow permission for operations: Read from > hosts: *, User:1 has Allow permission for operations: Read from hosts: *, > User:27 has Allow permission for operations: Read from hosts: *) but got > Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 > has Allow permission for operations: Read from hosts: *, User:21 has Allow > permission for operations: Read from hosts: *, User:39 has Allow permission > for operations: Read from hosts: *, User:43 has Allow permission for > operations: Read from hosts: *, User:3 has Allow permission for operations: > Read from hosts: *, User:35 has Allow permission for operations: Read from > hosts: *, User:15 has Allow permission for operations: Read from hosts: *, > User:16 has Allow permission for operations: Read from hosts: *, User:22 has > Allow permission for operations: Read from hosts: *, User:26 has Allow > permission for operations: Read from hosts: *, User:11 has Allow permission > for operations: Read from hosts: *, User:38 has Allow permission for > operations: Read f
[jira] [Resolved] (KAFKA-4960) Invalid state store exception
[ https://issues.apache.org/jira/browse/KAFKA-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-4960. -- Resolution: Cannot Reproduce > Invalid state store exception > - > > Key: KAFKA-4960 > URL: https://issues.apache.org/jira/browse/KAFKA-4960 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: j yeargers >Priority: Major > Attachments: text.html > > > Attempts to run windowed aggregation returns this exception: > 2017-03-27 20:14:28,776 [StreamThread-1] WARN > o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING > to NOT_RUNNING > 2017-03-27 20:14:28,776 [StreamThread-1] WARN > o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING > to NOT_RUNNING > Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-00, topic=vi_preproc, > partition=1, offset=243574962 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: store > %s has closed > at > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398) > at > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457) > at > org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30) > at > org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131) > at > org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:84) > at > org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:35) > at > org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamWindowReduceProcessor.process(KStreamWindowReduce.java:94) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) > ... 2 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()
[ https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499755#comment-16499755 ] Guozhang Wang commented on KAFKA-6161: -- Since we have migrated to JDK8 now in 2.0, should we restart picking up on this one? > Introduce new serdes interfaces with empty configure() and close() > -- > > Key: KAFKA-6161 > URL: https://issues.apache.org/jira/browse/KAFKA-6161 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Evgeny Veretennikov >Assignee: Evgeny Veretennikov >Priority: Major > > {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods > {{configure()}} and {{close()}}. Pretty often one want to leave these methods > empty. For example, a lot of serializers inside > {{org.apache.kafka.common.serialization}} package have these methods empty: > {code} > @Override > public void configure(Map configs, boolean isKey) { > // nothing to do > } > @Override > public void close() { > // nothing to do > } > {code} > To avoid such boilerplate, we may create new interfaces (like > {{UnconfiguredSerializer}}), in which we will define these methods empty. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()
[ https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499756#comment-16499756 ] ASF GitHub Bot commented on KAFKA-6161: --- guozhangwang closed pull request #4175: KAFKA-6161 add base classes for (De)Serializers with empty conf methods URL: https://github.com/apache/kafka/pull/4175 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java index 267211576b6..1c16b0bfc42 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java @@ -16,22 +16,10 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - -public class ByteArrayDeserializer implements Deserializer { - -@Override -public void configure(Map configs, boolean isKey) { -// nothing to do -} +public class ByteArrayDeserializer extends NoConfDeserializer { @Override public byte[] deserialize(String topic, byte[] data) { return data; } - -@Override -public void close() { -// nothing to do -} } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java index d069e9495e6..f5057b362a3 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java @@ -16,22 +16,10 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - -public class ByteArraySerializer implements Serializer { - -@Override -public void configure(Map configs, boolean isKey) { -// nothing to do -} +public class ByteArraySerializer extends NoConfSerializer { @Override public byte[] serialize(String topic, byte[] data) { return data; } - -@Override -public void close() { -// nothing to do -} } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java index d41f03c6675..ad22796c07f 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java @@ -17,13 +17,8 @@ package org.apache.kafka.common.serialization; import java.nio.ByteBuffer; -import java.util.Map; -public class ByteBufferDeserializer implements Deserializer { - -public void configure(Map configs, boolean isKey) { -// nothing to do -} +public class ByteBufferDeserializer extends NoConfDeserializer { public ByteBuffer deserialize(String topic, byte[] data) { if (data == null) @@ -31,8 +26,4 @@ public ByteBuffer deserialize(String topic, byte[] data) { return ByteBuffer.wrap(data); } - -public void close() { -// nothing to do -} } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java index c8c369272dd..ee3689a8a88 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java @@ -17,13 +17,8 @@ package org.apache.kafka.common.serialization; import java.nio.ByteBuffer; -import java.util.Map; -public class ByteBufferSerializer implements Serializer { - -public void configure(Map configs, boolean isKey) { -// nothing to do -} +public class ByteBufferSerializer extends NoConfSerializer { public byte[] serialize(String topic, ByteBuffer data) { if (data == null) @@ -43,8 +38,4 @@ public void configure(Map configs, boolean isKey) { data.rewind(); return ret; } - -public void close() { -// nothing to do -} } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java index 66b07eb5841..b912ef5b40c 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/seria
[jira] [Resolved] (KAFKA-5974) Removed unused parameter ProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-5974. -- Resolution: Fixed Fix Version/s: 2.0.0 This has been resolved in 2.0 release. > Removed unused parameter ProcessorContext > - > > Key: KAFKA-5974 > URL: https://issues.apache.org/jira/browse/KAFKA-5974 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Matthias J. Sax >Assignee: siva santhalingam >Priority: Major > Labels: needs-kip > Fix For: 2.0.0 > > > The method {{ProcessorContext#register}} has parameter {{loggingEnabled}} > that is unused. We should remove it eventually. However, this is a breaking > change of a public API so we need a KIP and a proper upgrade path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5924) Add the compose method to the Kafka Stream API
[ https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499759#comment-16499759 ] Guozhang Wang commented on KAFKA-5924: -- [~Crystark] Is the new API introduced since 1.0 as {{Topology}} and {{StreamsBuilder}} satisfying this requirement? > Add the compose method to the Kafka Stream API > -- > > Key: KAFKA-5924 > URL: https://issues.apache.org/jira/browse/KAFKA-5924 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Laurent T >Priority: Minor > Labels: needs-kip > > Hi, > I'm referencing RxJava for it's [compose > method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators] > which is very handy. It would be great if the Streams API would give us > something similar. It's pretty easy to implement and allows to have much more > clarity to the code (it avoids breaking the linearity of the code when you > want to reuse parts of the stream topology). e.g. > Without compose: > {code:java} > TopologyUtils > .myUtil(topology > .map(...) > .flatMap(...) > .through(...)) > .map(...) > .to(...); > {code} > With compose: > {code:java} > topology > .map(...) > .flatMap(...) > .through(...) > .compose(TopologyUtils::myUtil) > .map(...) > .to(...); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations
[ https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-3429. -- Resolution: Fixed With the new API in KIP-182 this is already resolved: the serdes will be inherited from the parent if they are not overridden by users in Materialized / Serialized. > Remove Serdes needed for repartitioning in KTable stateful operations > - > > Key: KAFKA-3429 > URL: https://issues.apache.org/jira/browse/KAFKA-3429 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: api > > Currently in KTable aggregate operations where a repartition is possibly > needed since the aggregation key may not be the same as the original primary > key, we require the users to provide serdes (default to configured ones) for > read / write to the internally created re-partition topic. However, these are > not necessary since for all KTable instances either generated from the topics > directly: > {code}table = builder.table(...){code} > or from aggregation operations: > {code}table = stream.aggregate(...){code} > There are already serde provided for materializing the data, and hence the > same serde can be re-used when the resulted KTable is involved in future > aggregation operations. For example: > {code} > table1 = stream.aggregateByKey(serde); > table2 = table1.aggregate(aggregator, selector, originalSerde, > aggregateSerde); > {code} > We would not need to require users to specify the "originalSerde" in > table1.aggregate since it could always reuse the "serde" from > stream.aggregateByKey, which is used to materialize the table1 object. > In order to get ride of it, implementation-wise we need to carry the serde > information along with the KTableImpl instance in order to re-use it in a > future operation that requires repartitioning. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info
[ https://issues.apache.org/jira/browse/KAFKA-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-5618. -- Resolution: Cannot Reproduce > Kafka stream not receive any topic/partitions/records info > -- > > Key: KAFKA-5618 > URL: https://issues.apache.org/jira/browse/KAFKA-5618 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: rtp-kafkastreams.log, rtp-kafkastreams2.log, > rtp-kafkastreams3.log > > > I have 3 brokers and 3 stream consumers. > I have there are 360 partitions and not able to bring up streams successfully > even after several retry. > I have attached the logs. > There are other topics which are having around 16 partitions and they are > able to successfully be consumed by kafka client > when tried getting thread dump using jstack the process is not responding > Attaching to process ID 10663, please wait... > Debugger attached successfully. > Server compiler detected. > JVM version is 24.141-b02 > Deadlock Detection: > java.lang.RuntimeException: Unable to deduce type of thread from address > 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, > JvmtiAgentThread, or SurrogateLockerThread) > at > sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162) > at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150) > at > sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149) > at > sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56) > at > sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5393) in-memory state store memory issue
[ https://issues.apache.org/jira/browse/KAFKA-5393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499769#comment-16499769 ] Guozhang Wang commented on KAFKA-5393: -- Is there an easy way to reproduce this issue for further investigate? > in-memory state store memory issue > --- > > Key: KAFKA-5393 > URL: https://issues.apache.org/jira/browse/KAFKA-5393 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Kevin Chen >Priority: Major > Attachments: Screen Shot 2017-06-06 at 9.45.42 AM.png > > > We are running 2 kafka stream instance that use low level Processor API, and > we are using in-memory state store. > When we upgrade instance A, which will move all the tasks to instance B, and > re-balance between the 2 after A is back up and running. > But the problem is that, even after the re-balance, the memory in instance A > did not drop to the previous level(the load is about the same). > see the screen shot below. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5058) Add a sensor to KafkaStreams to track records that have been dropped due to having a null key
[ https://issues.apache.org/jira/browse/KAFKA-5058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-5058. -- Resolution: Fixed Assignee: John Roesler Fix Version/s: 2.0.0 This has been resolved in https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics, coming in 2.0 > Add a sensor to KafkaStreams to track records that have been dropped due to > having a null key > - > > Key: KAFKA-5058 > URL: https://issues.apache.org/jira/browse/KAFKA-5058 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Damian Guy >Assignee: John Roesler >Priority: Minor > Fix For: 2.0.0 > > > In various places, i.e., {{KTableSource}}, {{StoreChangelogReader}}, > {{KStreamWindowAggregate}} etc, we drop records with a null key. We probably > should consider creating a sensor to track records that have been dropped. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5739) Rewrite KStreamPeekTest at processor level avoiding driver usage
[ https://issues.apache.org/jira/browse/KAFKA-5739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-5739. -- Resolution: Fixed Fix Version/s: 2.0.0 This is already fixed in trunk, coming in 2.0.0 > Rewrite KStreamPeekTest at processor level avoiding driver usage > > > Key: KAFKA-5739 > URL: https://issues.apache.org/jira/browse/KAFKA-5739 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > Fix For: 2.0.0 > > > Hi, > as already done for the {{KStreamPrintTest}} we could remove the usage of > {{KStreamTestDriver}} even in the {{KStreamPeekTest}} and testing it at > processor level not at stream level. > My proposal is to : > * create the {{KStreamPeek}} instance providing the action which fill a > collection as already happens today > * testing for both {{forwardDownStream}} values true and false > * using the {{MockProcessorContext}} class for overriding the {{forward}} > method filling a streamObserved collection as happens today > {{forwardDownStream}} is true; checking that the {{forward}} isn't called > when {{forwardDownStream}} is false (so the test fails) > Thanks, > Paolo -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6355) transient failure in org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies
[ https://issues.apache.org/jira/browse/KAFKA-6355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6355. -- Resolution: Cannot Reproduce Have not seen this issue for a while, closing for now. > transient failure in > org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies > -- > > Key: KAFKA-6355 > URL: https://issues.apache.org/jira/browse/KAFKA-6355 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: huxihx >Priority: Major > > Got transient failure during running > 'org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies' > Error Message > java.lang.AssertionError: Condition not met within timeout 3. Did not > receive all 20 records from topic singlePartitionOutputTopic > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. Did not > receive all 20 records from topic singlePartitionOutputTopic > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:195) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:165) > at > org.apache.kafka.streams.integration.EosIntegrationTest.runSimpleCopyTest(EosIntegrationTest.java:183) > at > org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies(EosIntegrationTest.java:135) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy1.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108) > at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessor
[jira] [Resolved] (KAFKA-6092) Time passed in punctuate call is currentTime, not punctuate schedule time.
[ https://issues.apache.org/jira/browse/KAFKA-6092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6092. -- Resolution: Fixed Have double checked the docs for coming 2.0 which is clear on the types of punctuate, and what people should expect in the punctuation behavior as well as the passing in time parameter. > Time passed in punctuate call is currentTime, not punctuate schedule time. > --- > > Key: KAFKA-6092 > URL: https://issues.apache.org/jira/browse/KAFKA-6092 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Stephane Maarek >Priority: Major > > The java doc specifies that for a Transformer, calling context.schedule calls > punctuate every 1000ms. This is not entirely accurate, as if no data is > received for a while, punctuate won't be called. > {code} > * void init(ProcessorContext context) { > * this.context = context; > * this.state = context.getStateStore("myTransformState"); > * context.schedule(1000); // call #punctuate() each > 1000ms > * } > {code} > When you receive new data say after 20 seconds, punctuate will play catch up > and will be called 20 times at reception of the new data. > the signature of punctuate is > {code} > * KeyValue punctuate(long timestamp) { > * // can access this.state > * // can emit as many new KeyValue pairs as required via > this.context#forward() > * return null; // don't return result -- can also be > "new KeyValue()" > * } > {code} > but the timestamp being passed is currentTimestamp at the time of the call to > punctuate, not at the time the punctuate was scheduled. It is very confusing > and I think the timestamp should represent the one at which the punctuate > should have been scheduled. Getting the current timestamp is not adding much > information as it can easily obtained using System.currentTimeMillis(); -- This message was sent by Atlassian JIRA (v7.6.3#76005)