[jira] [Created] (KAFKA-8411) Option to keep tombstones forever in compacted logs
Lukas Welte created KAFKA-8411: -- Summary: Option to keep tombstones forever in compacted logs Key: KAFKA-8411 URL: https://issues.apache.org/jira/browse/KAFKA-8411 Project: Kafka Issue Type: Improvement Components: log cleaner Reporter: Lukas Welte Log compaction is great to balance between having all data in a topic and reducing the amount of messages in a topic. However the delete tombstones are cleaned after a certain period. For some use cases we would really like to keep the latest message of each key, which means also the tombstone of that ID should be kept forever. An option to configure _delete.retention.ms_ to e.g. _-1_ to enable this functionality would be great. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846512#comment-16846512 ] Vivek Yadav commented on KAFKA-4084: [~jiaxinye] Is it works for you? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers
Sebastiaan created KAFKA-8412: - Summary: Still a nullpointer exception thrown on shutdown while flushing before closing producers Key: KAFKA-8412 URL: https://issues.apache.org/jira/browse/KAFKA-8412 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.1 Reporter: Sebastiaan I found a closed issue and replied there but decided to open one myself because although they're related they're slightly different. The original issue is at https://issues.apache.org/jira/browse/KAFKA-7678 The fix there has been to implement a null check around closing a producer because in some cases the producer is already null there (has been closed already) In version 2.1.1 we are getting a very similar exception, but in the 'flush' method that is called pre-close. This is in the log: {code:java} message: stream-thread [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed while closing StreamTask 1_26 due to the following error: logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} Followed by: {code:java} message: task [1_26] Could not close task due to the following error: logger_name: org.apache.kafka.streams.processor.internals.StreamTask java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} If I look at the source code at this point, I see a nice null check in the close method, but not in the flush method that is called just before that: {code:java} public void flush() { this.log.debug("Flushing producer"); this.producer.flush(); this.checkForException(); } public void close() { this.log.debug("Closing producer"); if (this.producer != null) { this.producer.close(); this.producer = null; } this.checkForException(); }{code} Seems to my (ignorant) eye that the flush method should also be wrapped in a null check in the same way as has been done for close. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846566#comment-16846566 ] Andrew commented on KAFKA-5998: --- I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are running in docker on kubernetes, and the same container works when run against some topics but not others it seems. {Code} java.io.FileNotFoundException: /tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) {Code} > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSH
[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846566#comment-16846566 ] Andrew edited comment on KAFKA-5998 at 5/23/19 9:29 AM: I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are running in docker on kubernetes, and the same container works when run against some topics but not others it seems. {code:java} java.io.FileNotFoundException: /tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) {code} When I `exec` into the running container I see this : {Code} /tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather # ls 2_0 2_10 2_12 2_14 2_16 2_18 2_2 2_4 2_6 2_8 3_0 3_10 3_12 3_14 3_16 3_18 3_2 3_4 3_6 3_8 {Code} was (Author: the4thamigo_uk): I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are running in docker on kubernetes, and the same container works when run against some topics but not others it seems. {Code} java.io.FileNotFoundException: /tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) {Code} > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.Fi
[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)
[ https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846598#comment-16846598 ] Omkar Mestry commented on KAFKA-7245: - [~mjsax] The core stream also has some class which are using the put method with two parameters. [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java] [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java] [https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java] [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java] > Deprecate WindowStore#put(key, value) > - > > Key: KAFKA-7245 > URL: https://issues.apache.org/jira/browse/KAFKA-7245 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Omkar Mestry >Priority: Minor > Labels: needs-kip, newbie > > We want to remove `WindowStore#put(key, value)` – for this, we first need to > deprecate is via a KIP and remove later. > Instead of using `WindowStore#put(key, value)` we need to migrate code to > specify the timestamp explicitly using `WindowStore#put(key, value, > timestamp)`. The current code base use the explicit call to set the timestamp > in production code already. The simplified `put(key, value)` is only used in > tests, and thus, we would need to update those tests. > [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846566#comment-16846566 ] Andrew edited comment on KAFKA-5998 at 5/23/19 11:32 AM: - I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are running in docker on kubernetes, and the same container works when run against some topics but not others it seems. {code:java} java.io.FileNotFoundException: /tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) {code} When I `exec` into the running container I see this : {code:java} /tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather # ls 2_0 2_10 2_12 2_14 2_16 2_18 2_2 2_4 2_6 2_8 3_0 3_10 3_12 3_14 3_16 3_18 3_2 3_4 3_6 3_8 {code} Also I see {Code} 09:55:49,898 INFO org.apache.kafka.streams.processor.internals.StateDirectory - stream-thread [kafka-streams-join-to-nearest-spm-draft-position-weather-404a240b-1e95-4587-80bf-686047658254-CleanupThread] Deleting obsolete state directory 0_1 for task 0_1 as 600898ms has elapsed (cleanup delay is 60ms). {Code} And I see the following in the StreamsConfig log output : {Code} state.cleanup.delay.ms = 60 state.dir = /tmp/kafka-streams {Code} was (Author: the4thamigo_uk): I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are running in docker on kubernetes, and the same container works when run against some topics but not others it seems. {code:java} java.io.FileNotFoundException: /tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) {code} When I `exec` into the running container I see this : {Code} /tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather # ls 2_0 2_10 2_12 2_14 2_16 2_18 2_2 2_4 2_6 2_8 3_0 3_10 3_12 3_14 3_16 3_18 3_2 3_4 3_6 3_8 {Code} > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props
[jira] [Created] (KAFKA-8413) Add possibility to do repartitioning on KStream
Levani Kokhreidze created KAFKA-8413: Summary: Add possibility to do repartitioning on KStream Key: KAFKA-8413 URL: https://issues.apache.org/jira/browse/KAFKA-8413 Project: Kafka Issue Type: New Feature Components: streams Reporter: Levani Kokhreidze Attachments: topology-1.png, topology-2.png Consider following code: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") ); {code} This code will generate following topology: {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-00 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-01 Processor: KSTREAM-KEY-SELECT-01 (stores: []) --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 <-- KSTREAM-SOURCE-00 Processor: KSTREAM-FILTER-04 (stores: []) --> KSTREAM-SINK-03 <-- KSTREAM-KEY-SELECT-01 Processor: KSTREAM-FILTER-08 (stores: []) --> KSTREAM-SINK-07 <-- KSTREAM-KEY-SELECT-01 Sink: KSTREAM-SINK-03 (topic: store-1-repartition) <-- KSTREAM-FILTER-04 Sink: KSTREAM-SINK-07 (topic: store-2-repartition) <-- KSTREAM-FILTER-08 Sub-topology: 1 Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) --> KSTREAM-AGGREGATE-02 Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) --> none <-- KSTREAM-SOURCE-05 Sub-topology: 2 Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) --> KSTREAM-AGGREGATE-06 Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) --> none <-- KSTREAM-SOURCE-09 {code} Kafka Streams creates two repartition topics for each `groupByKey` operation. In this example, two repartition topics are not really necessary and processing can be done with one sub-topology. Kafka Streams user, in DSL, may specify repartition topic manually using *KStream#through* method: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input-topic") .selectKey((key, value) -> value) .through("repartition-topic"); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") ); {code} {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-00 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-01 Processor: KSTREAM-KEY-SELECT-01 (stores: []) --> KSTREAM-SINK-02 <-- KSTREAM-SOURCE-00 Sink: KSTREAM-SINK-02 (topic: repartition-topic) <-- KSTREAM-KEY-SELECT-01 Sub-topology: 1 Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) --> none <-- KSTREAM-SOURCE-03 Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) --> none <-- KSTREAM-SOURCE-03 {code} While this gives possibility to optimizes Kafka Streams application, user still has to manually create repartition topic with correct number of partitions based on input topic. It would be great if in DSL we could have something like *repartition()* operation on *KStream* which can generate repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846740#comment-16846740 ] Levani Kokhreidze commented on KAFKA-8413: -- Happy to work on KIP if this feature makes sense. > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-SINK-02 > <-- KSTREAM-SOURCE-00 > Sink: KSTREAM-SINK-02 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-01 > Sub-topology: 1 > Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 > Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-03 > Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-03 > {code} > > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze updated KAFKA-8413: - Description: Consider following code: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") ); {code} This code will generate following topology: {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-00 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-01 Processor: KSTREAM-KEY-SELECT-01 (stores: []) --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 <-- KSTREAM-SOURCE-00 Processor: KSTREAM-FILTER-04 (stores: []) --> KSTREAM-SINK-03 <-- KSTREAM-KEY-SELECT-01 Processor: KSTREAM-FILTER-08 (stores: []) --> KSTREAM-SINK-07 <-- KSTREAM-KEY-SELECT-01 Sink: KSTREAM-SINK-03 (topic: store-1-repartition) <-- KSTREAM-FILTER-04 Sink: KSTREAM-SINK-07 (topic: store-2-repartition) <-- KSTREAM-FILTER-08 Sub-topology: 1 Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) --> KSTREAM-AGGREGATE-02 Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) --> none <-- KSTREAM-SOURCE-05 Sub-topology: 2 Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) --> KSTREAM-AGGREGATE-06 Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) --> none <-- KSTREAM-SOURCE-09 {code} Kafka Streams creates two repartition topics for each `groupByKey` operation. In this example, two repartition topics are not really necessary and processing can be done with one sub-topology. Kafka Streams user, in DSL, may specify repartition topic manually using *KStream#through* method: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input-topic") .selectKey((key, value) -> value) .through("repartition-topic"); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") ); {code} {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-00 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-01 Processor: KSTREAM-KEY-SELECT-01 (stores: []) --> KSTREAM-SINK-02 <-- KSTREAM-SOURCE-00 Sink: KSTREAM-SINK-02 (topic: repartition-topic) <-- KSTREAM-KEY-SELECT-01 Sub-topology: 1 Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) --> none <-- KSTREAM-SOURCE-03 Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) --> none <-- KSTREAM-SOURCE-03 {code} While this gives possibility to optimizes Kafka Streams application, user still has to manually create repartition topic with correct number of partitions based on input topic. It would be great if in DSL we could have something like *repartition()* operation on *KStream* which can generate repartition topic based on user command. was: Consider following code: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") ); {code} This code will generate following topology: {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-00 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-01 Processor: KSTREAM-KEY-SELECT-01 (stores: []) --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 <-- KSTREAM-SOURCE-00 Processor: KSTREAM-FILTER-04 (stores: []) --> KSTREAM-SINK-03 <-- KSTREAM-KEY-SELECT-01 Processor: KSTREAM-FILTER-08 (stores: []) --> KSTREAM-SINK-07 <-- KSTREAM-KEY-SELECT-01 Sink: KSTREAM-SINK-03 (topic: store-1-repartition) <-- KSTREAM-FILTER-04 Sink: KSTREAM-SINK-07 (topic: store-2-repartition) <-- KSTREAM-FILTER-08 Sub-topology: 1 Source: KSTREAM-
[jira] [Commented] (KAFKA-8411) Option to keep tombstones forever in compacted logs
[ https://issues.apache.org/jira/browse/KAFKA-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846775#comment-16846775 ] Matthias J. Sax commented on KAFKA-8411: Well. The config is of type long. If you set it to Long.MAX_VALUE, you can configure many million years of retention was is effectively forever. This should help you to do what you want already. Just to enable your use case without this ticket. We can still allow -1, but it won't "change" anything, net effect. > Option to keep tombstones forever in compacted logs > > > Key: KAFKA-8411 > URL: https://issues.apache.org/jira/browse/KAFKA-8411 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Lukas Welte >Priority: Major > > Log compaction is great to balance between having all data in a topic and > reducing the amount of messages in a topic. > However the delete tombstones are cleaned after a certain period. For some > use cases we would really like to keep the latest message of each key, which > means also the tombstone of that ID should be kept forever. > An option to configure _delete.retention.ms_ to e.g. _-1_ to enable this > functionality would be great. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8411) Option to keep tombstones forever in compacted logs
[ https://issues.apache.org/jira/browse/KAFKA-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8411: --- Priority: Minor (was: Major) > Option to keep tombstones forever in compacted logs > > > Key: KAFKA-8411 > URL: https://issues.apache.org/jira/browse/KAFKA-8411 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Lukas Welte >Priority: Minor > > Log compaction is great to balance between having all data in a topic and > reducing the amount of messages in a topic. > However the delete tombstones are cleaned after a certain period. For some > use cases we would really like to keep the latest message of each key, which > means also the tombstone of that ID should be kept forever. > An option to configure _delete.retention.ms_ to e.g. _-1_ to enable this > functionality would be great. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8414) org.apache.kafka.common.metrics.MetricsTest.testConcurrentReadUpdateReport hang
dan norwood created KAFKA-8414: -- Summary: org.apache.kafka.common.metrics.MetricsTest.testConcurrentReadUpdateReport hang Key: KAFKA-8414 URL: https://issues.apache.org/jira/browse/KAFKA-8414 Project: Kafka Issue Type: Bug Reporter: dan norwood caveat: this only happens on AMD Epyc machines with >=48 cpus. i have below a bunch of machine info from various `*a.*` aws instance sizes i ran against. i noticed what seems like a deadlock when running `org.apache.kafka.common.metrics.MetricsTest` on an aws instance with 96vCPUs (specifically a m5a.24xlarge). after some debugging it seems like the offending issue is [https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L776-L778] {code:java} public void run() { try { while (alive.get()) { op.run(); } } catch (Throwable t) { log.error("Metric {} failed with exception", opName, t); } } {code} since the `op.run()` methods are all synchronized we end up nonstop hammering it. after adding some logging i saw steadily increasing wait times for entry in to each synchronized block. so this is not *really* a deadlock or hang, but a progressive slowdown that makes the test unrunnable. the offending op seems to be [https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L747] {code:java} Future reportFuture = executorService.submit(new ConcurrentMetricOperation(alive, "report", () -> reporter.processMetrics())); {code} possible fix: adding a `Thread.sleep(0, 1)` inside the runloop for `ConcurrentMetricOperation` seems to allow the test to pass. but i'm not sure that it wouldn't mask an issue that the test is meant to detect Good: t3a.large ``` Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 2 On-line CPU(s) list: 0,1 Thread(s) per core: 2 Core(s) per socket: 1 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2200.116 BogoMIPS: 4400.23 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0,1 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr arat npt nrip_save ``` t3a.2xlarge ``` Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2199.916 BogoMIPS: 4399.83 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0-7 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr arat npt nrip_save ``` m5a.4xlarge ``` Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 16 On-line CPU(s) list: 0-15 Thread(s) per core: 2 Core(s) per socket: 8 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2585.550 BogoMIPS: 4399.98 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0-15 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat p
[jira] [Updated] (KAFKA-8414) org.apache.kafka.common.metrics.MetricsTest.testConcurrentReadUpdateReport hang
[ https://issues.apache.org/jira/browse/KAFKA-8414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dan norwood updated KAFKA-8414: --- Description: caveat: this only happens on AMD Epyc machines with >=48 cpus. i have below a bunch of machine info from various `*a.*` aws instance sizes i ran against. all tests were using `OpenJDK Runtime Environment (build 1.8.0_201-b09)` and `Amazon Linux 2 AMI 2.0.20190508 x86_64 HVM gp2` i noticed what seems like a deadlock when running `org.apache.kafka.common.metrics.MetricsTest` on an aws instance with 96vCPUs (specifically a m5a.24xlarge). after some debugging it seems like the offending issue is [https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L776-L778] {code:java} public void run() { try { while (alive.get()) { op.run(); } } catch (Throwable t) { log.error("Metric {} failed with exception", opName, t); } } {code} since the `op.run()` methods are all synchronized we end up nonstop hammering it. after adding some logging i saw steadily increasing wait times for entry in to each synchronized block. so this is not *really* a deadlock or hang, but a progressive slowdown that makes the test unrunnable. the offending op seems to be [https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L747] {code:java} Future reportFuture = executorService.submit(new ConcurrentMetricOperation(alive, "report", () -> reporter.processMetrics())); {code} possible fix: adding a `Thread.sleep(0, 1)` inside the runloop for `ConcurrentMetricOperation` seems to allow the test to pass. but i'm not sure that it wouldn't mask an issue that the test is meant to detect Good: t3a.large {noformat} Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 2 On-line CPU(s) list: 0,1 Thread(s) per core: 2 Core(s) per socket: 1 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2200.116 BogoMIPS: 4400.23 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0,1 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr arat npt nrip_save{noformat} t3a.2xlarge {noformat} Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2199.916 BogoMIPS: 4399.83 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0-7 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr arat npt nrip_save{noformat} m5a.4xlarge {noformat} Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 16 On-line CPU(s) list: 0-15 Thread(s) per core: 2 Core(s) per socket: 8 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2585.550 BogoMIPS: 4399.98 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0-15 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep
[jira] [Created] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation
Konstantine Karantasis created KAFKA-8415: - Summary: Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation Key: KAFKA-8415 URL: https://issues.apache.org/jira/browse/KAFKA-8415 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.3.0 Reporter: Konstantine Karantasis Assignee: Konstantine Karantasis Fix For: 2.3.0 Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} were recently added in Connect as plugins that can be loaded in class loading isolation. However the interface itself was not excluded from isolation itself, which results into definition conflicts. Any interface that is considered a base Connect plugin interface needs to be excluded by isolation itself (it's considered a "system" type). Here's the exception: {code:java} [2019-05-23 15:16:57,802] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:84) java.util.ServiceConfigurationError: org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: Provider org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy not a subtype at java.util.ServiceLoader.fail(ServiceLoader.java:239) at java.util.ServiceLoader.access$300(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers
[ https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846787#comment-16846787 ] Matthias J. Sax commented on KAFKA-8412: Thanks for reporting this. What I don't understand is, why we would flush after we closed a task already. Hence, I am not sure if a null-guard is the correct fix, but to rather make sure we don't call flush() in the first place. Can you maybe provide debug level logs? This might help to understand the scenario better. > Still a nullpointer exception thrown on shutdown while flushing before > closing producers > > > Key: KAFKA-8412 > URL: https://issues.apache.org/jira/browse/KAFKA-8412 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Sebastiaan >Priority: Minor > > I found a closed issue and replied there but decided to open one myself > because although they're related they're slightly different. The original > issue is at https://issues.apache.org/jira/browse/KAFKA-7678 > The fix there has been to implement a null check around closing a producer > because in some cases the producer is already null there (has been closed > already) > In version 2.1.1 we are getting a very similar exception, but in the 'flush' > method that is called pre-close. This is in the log: > {code:java} > message: stream-thread > [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed > while closing StreamTask 1_26 due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > Followed by: > > {code:java} > message: task [1_26] Could not close task due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.StreamTask > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > If I look at the source code at this point, I see a nice null check in the > close method, but not in the flush method that is called just before that: > {code:java} > public void flush() { > this.log.debug("Flushing producer"); > this.producer.flush(); > this.checkForException(); > } > public void close() { > this.log.debug("Closing producer"); > if (this.producer != null) { > this.producer.close(); > this.producer = null; > } > this.checkForException(); > }{code} > Seems to my (ignorant) eye that the flush method should also be wrapped in a > null check in the same way as has been done for close. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846791#comment-16846791 ] Matthias J. Sax commented on KAFKA-8413: This should already be fixed. You need to turn on topology optimization though. Compare https://issues.apache.org/jira/browse/KAFKA-6761 Seems we can close this ticket as "invalid" ? > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-SINK-02 > <-- KSTREAM-SOURCE-00 > Sink: KSTREAM-SINK-02 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-01 > Sub-topology: 1 > Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 > Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-03 > Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-03 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7201) Optimize repartition operations
[ https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846795#comment-16846795 ] Bill Bejeck edited comment on KAFKA-7201 at 5/23/19 3:37 PM: - [~mjsax] Apologies for the delayed response. Yes, this is resolved KAFKA-6761 was (Author: bbejeck): [~mjsax] Apologies for the delayed response. Yes this is resolved [link KAFKA-6761|https://issues.apache.org/jira/browse/KAFKA-6761] > Optimize repartition operations > --- > > Key: KAFKA-7201 > URL: https://issues.apache.org/jira/browse/KAFKA-7201 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > When the topology has a key changing operation, any downstream processors > using the key will automatically create a repartition topic. In most cases > these multiple repartition topics can be collapsed into one repartition > operation occurring immediately after the key changing operation, thus > reducing streams overall footprint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7201) Optimize repartition operations
[ https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846795#comment-16846795 ] Bill Bejeck commented on KAFKA-7201: [~mjsax] Apologies for the delayed response. Yes this is resolved [link KAFKA-6761|https://issues.apache.org/jira/browse/KAFKA-6761] > Optimize repartition operations > --- > > Key: KAFKA-7201 > URL: https://issues.apache.org/jira/browse/KAFKA-7201 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > When the topology has a key changing operation, any downstream processors > using the key will automatically create a repartition topic. In most cases > these multiple repartition topics can be collapsed into one repartition > operation occurring immediately after the key changing operation, thus > reducing streams overall footprint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7201) Optimize repartition operations
[ https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-7201. Resolution: Fixed Resolved via https://issues.apache.org/jira/browse/KAFKA-6761 > Optimize repartition operations > --- > > Key: KAFKA-7201 > URL: https://issues.apache.org/jira/browse/KAFKA-7201 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > When the topology has a key changing operation, any downstream processors > using the key will automatically create a repartition topic. In most cases > these multiple repartition topics can be collapsed into one repartition > operation occurring immediately after the key changing operation, thus > reducing streams overall footprint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7201) Optimize repartition operations
[ https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-7201: --- Fix Version/s: 2.1.0 > Optimize repartition operations > --- > > Key: KAFKA-7201 > URL: https://issues.apache.org/jira/browse/KAFKA-7201 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.1.0 > > > When the topology has a key changing operation, any downstream processors > using the key will automatically create a repartition topic. In most cases > these multiple repartition topics can be collapsed into one repartition > operation occurring immediately after the key changing operation, thus > reducing streams overall footprint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7201) Optimize repartition operations
[ https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-7201: --- Affects Version/s: 2.1.0 > Optimize repartition operations > --- > > Key: KAFKA-7201 > URL: https://issues.apache.org/jira/browse/KAFKA-7201 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > When the topology has a key changing operation, any downstream processors > using the key will automatically create a repartition topic. In most cases > these multiple repartition topics can be collapsed into one repartition > operation occurring immediately after the key changing operation, thus > reducing streams overall footprint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8414) org.apache.kafka.common.metrics.MetricsTest.testConcurrentReadUpdateReport hang
[ https://issues.apache.org/jira/browse/KAFKA-8414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dan norwood updated KAFKA-8414: --- Description: caveat: this only happens on AMD Epyc machines with >=48 cpus. i have below a bunch of machine info from various `*a.*` aws instance sizes i ran against. all tests were using `OpenJDK Runtime Environment (build 1.8.0_201-b09)` and `Amazon Linux 2 AMI 2.0.20190508 x86_64 HVM gp2` i noticed what seems like a deadlock when running `org.apache.kafka.common.metrics.MetricsTest` on an aws instance with 96vCPUs (specifically a m5a.24xlarge). after some debugging it seems like the offending issue is [https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L776-L778] {code:java} public void run() { try { while (alive.get()) { op.run(); } } catch (Throwable t) { log.error("Metric {} failed with exception", opName, t); } } {code} since the `op.run()` methods are all synchronized we end up nonstop hammering it. after adding some logging i saw steadily increasing wait times for entry in to each synchronized block. so this is not *really* a deadlock or hang, but a progressive slowdown that makes the test unrunnable. the offending op seems to be [https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L747] {code:java} Future reportFuture = executorService.submit(new ConcurrentMetricOperation(alive, "report", () -> reporter.processMetrics())); {code} possible fix: adding a `Thread.sleep(0, 1)` inside the runloop for `ConcurrentMetricOperation` seems to allow the test to pass. but i'm not sure that it wouldn't mask an issue that the test is meant to detect Good: t3a.large {noformat} Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 2 On-line CPU(s) list: 0,1 Thread(s) per core: 2 Core(s) per socket: 1 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2200.116 BogoMIPS: 4400.23 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0,1 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr arat npt nrip_save{noformat} t3a.2xlarge {noformat} Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2199.916 BogoMIPS: 4399.83 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0-7 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr arat npt nrip_save{noformat} m5a.4xlarge {noformat} Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 16 On-line CPU(s) list: 0-15 Thread(s) per core: 2 Core(s) per socket: 8 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 1 Model name: AMD EPYC 7571 Stepping: 2 CPU MHz: 2585.550 BogoMIPS: 4399.98 Hypervisor vendor: KVM Virtualization type: full L1d cache: 32K L1i cache: 64K L2 cache: 512K L3 cache: 8192K NUMA node0 CPU(s): 0-15 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtr
[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)
[ https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846800#comment-16846800 ] Matthias J. Sax commented on KAFKA-7245: Yes. The classes you listed _implement_ the interface, hence they need to use `put()` with two parameters – those would all be removed when the method in removed from the interface. > Deprecate WindowStore#put(key, value) > - > > Key: KAFKA-7245 > URL: https://issues.apache.org/jira/browse/KAFKA-7245 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Omkar Mestry >Priority: Minor > Labels: needs-kip, newbie > > We want to remove `WindowStore#put(key, value)` – for this, we first need to > deprecate is via a KIP and remove later. > Instead of using `WindowStore#put(key, value)` we need to migrate code to > specify the timestamp explicitly using `WindowStore#put(key, value, > timestamp)`. The current code base use the explicit call to set the timestamp > in production code already. The simplified `put(key, value)` is only used in > tests, and thus, we would need to update those tests. > [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers
[ https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846808#comment-16846808 ] Sebastiaan commented on KAFKA-8412: --- [~mjsax] I can try to reproduce it in development some more but so far we've only seen it in production. But my theory is that it is similar to the other ticket, a comment https://issues.apache.org/jira/browse/KAFKA-7678?focusedCommentId=16715220&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16715220 says: _"There are one or two edge cases which can cause record collector to be closed multiple times, we have noticed them recently and are thinking about cleanup the classes along the calling hierarchy (i.e. from Task Manager -> Task -> RecordCollector) for it. One example is:_ _1) a task is *suspended*, with EOS turned on (like your case), the record collector is closed()._ _2) then the instance got killed (SIGTERM) , which causes all threads to be closed, which will then cause all their owned tasks to be *closed*. The same record collector close() call will be triggered again"_ So this could be the same issue but now not for close but for flush. The producer is already flushed and closed but the same thing is tried again. Of course I don't know anything about the internals of the client so take this with a grain of salt. > Still a nullpointer exception thrown on shutdown while flushing before > closing producers > > > Key: KAFKA-8412 > URL: https://issues.apache.org/jira/browse/KAFKA-8412 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Sebastiaan >Priority: Minor > > I found a closed issue and replied there but decided to open one myself > because although they're related they're slightly different. The original > issue is at https://issues.apache.org/jira/browse/KAFKA-7678 > The fix there has been to implement a null check around closing a producer > because in some cases the producer is already null there (has been closed > already) > In version 2.1.1 we are getting a very similar exception, but in the 'flush' > method that is called pre-close. This is in the log: > {code:java} > message: stream-thread > [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed > while closing StreamTask 1_26 due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > Followed by: > > {code:java} > message: task [1_26] Could not close task due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.StreamTask > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > If I look at the source code at this point, I see a nice null check in the > close method, but not in the flush method that is called just before that: > {code:java} > public void flush() { > this.log.debug("Flush
[jira] [Commented] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation
[ https://issues.apache.org/jira/browse/KAFKA-8415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846816#comment-16846816 ] ASF GitHub Bot commented on KAFKA-8415: --- kkonstantine commented on pull request #6796: KAFKA-8415: Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation URL: https://github.com/apache/kafka/pull/6796 * New Connect plugin interface ConnectorClientConfigOverridePolicy needs to be excluded from the class loading isolation * Added missing unit tests similar to the ones existing for previous plugins ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Interface ConnectorClientConfigOverridePolicy needs to be excluded from class > loading isolation > --- > > Key: KAFKA-8415 > URL: https://issues.apache.org/jira/browse/KAFKA-8415 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 2.3.0 > > > Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} > were recently added in Connect as plugins that can be loaded in class loading > isolation. > However the interface itself was not excluded from isolation itself, which > results into definition conflicts. Any interface that is considered a base > Connect plugin interface needs to be excluded by isolation itself (it's > considered a "system" type). > Here's the exception: > {code:java} > [2019-05-23 15:16:57,802] ERROR Stopping due to error > (org.apache.kafka.connect.cli.ConnectDistributed:84) > java.util.ServiceConfigurationError: > org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: > Provider > org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy > not a subtype at > java.util.ServiceLoader.fail(ServiceLoader.java:239) at > java.util.ServiceLoader.access$300(ServiceLoader.java:185) at > java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) > at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) > at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation
[ https://issues.apache.org/jira/browse/KAFKA-8415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-8415: -- Description: Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} were recently added in Connect as plugins that can be loaded in class loading isolation. However the interface itself was not excluded from isolation itself, which results into definition conflicts. Any interface that is considered a base Connect plugin interface needs to be excluded by isolation itself (it's considered a "system" type). Here's the exception: {code:java} [2019-05-23 15:16:57,802] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:84) java.util.ServiceConfigurationError: org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: Provider org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy not a subtype at java.util.ServiceLoader.fail(ServiceLoader.java:239) at java.util.ServiceLoader.access$300(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78) {code} was: Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} were recently added in Connect as plugins that can be loaded in class loading isolation. However the interface itself was not excluded from isolation itself, which results into definition conflicts. Any interface that is considered a base Connect plugin interface needs to be excluded by isolation itself (it's considered a "system" type). Here's the exception: {code:java} [2019-05-23 15:16:57,802] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:84) java.util.ServiceConfigurationError: org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: Provider org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy not a subtype at java.util.ServiceLoader.fail(ServiceLoader.java:239) at java.util.ServiceLoader.access$300(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78) {code} > Interface ConnectorClientConfigOverridePolicy needs to be excluded from class > loading isolation > --- > > Key: KAFKA-8415 > URL: https://issues.apache.org/jira/browse/KAFKA-8415 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Konstant
[jira] [Assigned] (KAFKA-7760) Add broker configuration to set minimum value for segment.bytes and segment.ms
[ https://issues.apache.org/jira/browse/KAFKA-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brandt Newton reassigned KAFKA-7760: Assignee: (was: Brandt Newton) > Add broker configuration to set minimum value for segment.bytes and segment.ms > -- > > Key: KAFKA-7760 > URL: https://issues.apache.org/jira/browse/KAFKA-7760 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Priority: Major > Labels: kip, newbie > > If someone set segment.bytes or segment.ms at topic level to a very small > value (e.g. segment.bytes=1000 or segment.ms=1000), Kafka will generate a > very high number of segment files. This can bring down the whole broker due > to hitting the maximum open file (for log) or maximum number of mmap-ed file > (for index). > To prevent that from happening, I would like to suggest adding two new items > to the broker configuration: > * min.topic.segment.bytes, defaults to 1048576: The minimum value for > segment.bytes. When someone sets topic configuration segment.bytes to a value > lower than this, Kafka throws an error INVALID VALUE. > * min.topic.segment.ms, defaults to 360: The minimum value for > segment.ms. When someone sets topic configuration segment.ms to a value lower > than this, Kafka throws an error INVALID VALUE. > Thanks > Badai -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846838#comment-16846838 ] Levani Kokhreidze commented on KAFKA-8413: -- Hi Matthias, I've enabled topology optimization, but in this particular example, there're still 2 repartition topics created. {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code} Kafka Streams version 2.2.0 Topics created: {code:java} 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition{code} Actually, one other thing why introducing additional manual repartitoin may be valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when key operation is followed by stateful operation, like *groupByKey().aggregate(...)* but there's may be the case, that in DSL user may be using stateful *transform(...)* operation for aggregation. Consider following example: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId.transform(...) // stateful tranformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation {code} In this example there's no repartition topic created, one the other hand if had something like `repartition()` operation on KStream we could write something like this, which would be pretty cool imho: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .repartitionBy((key, value) -> new_key); streamByProfileId.transform(...) // stateful transformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation{code} > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); >
[jira] [Comment Edited] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846838#comment-16846838 ] Levani Kokhreidze edited comment on KAFKA-8413 at 5/23/19 4:28 PM: --- Hi Matthias, I've enabled topology optimization, but in this particular example, there're still 2 repartition topics created (Kafka Streams version 2.2.0). {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code} Topics created: {code:java} 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition {code} Actually, one other thing why introducing additional manual repartitoin may be valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when key operation is followed by stateful operation, like *groupByKey().aggregate(...)* but there's may be the case, that in DSL user may be using stateful *transform(...)* operation for aggregation. Consider following example: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId.transform(...) // stateful tranformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation {code} In this example there's no repartition topic created, one the other hand if had something like `repartition()` operation on KStream we could write something like this, which would be pretty cool imho: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .repartitionBy((key, value) -> new_key); streamByProfileId.transform(...) // stateful transformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation{code} was (Author: lkokhreidze): Hi Matthias, I've enabled topology optimization, but in this particular example, there're still 2 repartition topics created. {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code} Kafka Streams version 2.2.0 Topics created: {code:java} 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition{code} Actually, one other thing why introducing additional manual repartitoin may be valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when key operation is followed by stateful operation, like *groupByKey().aggregate(...)* but there's may be the case, that in DSL user may be using stateful *transform(...)* operation for aggregation. Consider following example: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId.transform(...) // stateful tranformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation {code} In this example there's no repartition topic created, one the other hand if had something like `repartition()` operation on KStream we could write something like this, which would be pretty cool imho: {code:java} final KStream streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .repartitionBy((key, value) -> new_key); streamByProfileId.transform(...) // stateful transformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation{code} > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846843#comment-16846843 ] Ted Yu commented on KAFKA-5998: --- Can you move state directory outside of /tmp which is subject to cleaning by the OS ? > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.S
[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846844#comment-16846844 ] Matthias J. Sax commented on KAFKA-8413: {quote}I've enabled topology optimization, but in this particular example, there're still 2 repartition topics created (Kafka Streams version 2.2.0). {quote} \cc [~bbejeck] – Can you look into this? This would be a bug. About the other request: I agree that this might be helpful, and in fact there is a similar ticket, including a KIP draft for this: * https://issues.apache.org/jira/browse/KAFKA-6037 * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint] The KIP is inactive, to feel free to pick it up. I would not add `repartition()` operation though, but stick with `through()` and make the topic-name optional to let KS manage the topic. > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-SINK-02 > <-- KSTREAM-SOURCE-00 > Sink: KSTREAM-SINK-02 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-01 > Sub-topology: 1 > Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 > Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-03 > Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-03 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have >
[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846847#comment-16846847 ] Levani Kokhreidze commented on KAFKA-8413: -- Thanks. I'll look into it > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-SINK-02 > <-- KSTREAM-SOURCE-00 > Sink: KSTREAM-SINK-02 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-01 > Sub-topology: 1 > Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 > Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-03 > Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-03 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers
[ https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846851#comment-16846851 ] Matthias J. Sax commented on KAFKA-8412: Do you run with EOS enabled? Also, closing() and flushing() is a little different... I am not saying there is no issue, I just try to figure out what the correct fix is. Just adding a `null`-check could actually just mask the root cause of the bug, but not fix the bug itself. > Still a nullpointer exception thrown on shutdown while flushing before > closing producers > > > Key: KAFKA-8412 > URL: https://issues.apache.org/jira/browse/KAFKA-8412 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Sebastiaan >Priority: Minor > > I found a closed issue and replied there but decided to open one myself > because although they're related they're slightly different. The original > issue is at https://issues.apache.org/jira/browse/KAFKA-7678 > The fix there has been to implement a null check around closing a producer > because in some cases the producer is already null there (has been closed > already) > In version 2.1.1 we are getting a very similar exception, but in the 'flush' > method that is called pre-close. This is in the log: > {code:java} > message: stream-thread > [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed > while closing StreamTask 1_26 due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > Followed by: > > {code:java} > message: task [1_26] Could not close task due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.StreamTask > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > If I look at the source code at this point, I see a nice null check in the > close method, but not in the flush method that is called just before that: > {code:java} > public void flush() { > this.log.debug("Flushing producer"); > this.producer.flush(); > this.checkForException(); > } > public void close() { > this.log.debug("Closing producer"); > if (this.producer != null) { > this.producer.close(); > this.producer = null; > } > this.checkForException(); > }{code} > Seems to my (ignorant) eye that the flush method should also be wrapped in a > null check in the same way as has been done for close. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts
[ https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846857#comment-16846857 ] Matthias J. Sax commented on KAFKA-7994: I think it might actually be worth to split the two issues into two tickets. One ticket to just preserve partition time over restarts (the original issue of this ticket), and do a new ticket for global stream time. There are still many open question what global stream time actually means and it will be a difficult and long design phase until we can merge any code. Hence, I would like to unblock the original issue of this ticket. Thoughts? > Improve Stream-Time for rebalances and restarts > --- > > Key: KAFKA-7994 > URL: https://issues.apache.org/jira/browse/KAFKA-7994 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Attachments: possible-patch.diff > > > We compute a per-partition partition-time as the maximum timestamp over all > records processed so far. Furthermore, we use partition-time to compute > stream-time for each task as maximum over all partition-times (for all > corresponding task partitions). This stream-time is used to make decisions > about processing out-of-order records or drop them if they are late (ie, > timestamp < stream-time - grace-period). > During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, > -1) for tasks that are newly created (or migrated). In net effect, we forget > current stream-time for this case what may lead to non-deterministic behavior > if we stop processing right before a late record, that would be dropped if we > continue processing, but is not dropped after rebalance/restart. Let's look > at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and > the following records (timestamps in parenthesis): > > {code:java} > r1(0) r2(5) r3(11) r4(2){code} > In the example, stream-time advances as 0, 5, 11, 11 and thus record `r4` is > dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or > rebalance after processing `r3` but before processing `r4`, we would > reinitialize stream-time as -1, and thus would process `r4` on restart/after > rebalance. The problem is, that stream-time does advance differently from a > global point of view: 0, 5, 11, 2. > Note, this is a corner case, because if we would stop processing one record > earlier, ie, after processing `r2` but before processing `r3`, stream-time > would be advance correctly from a global point of view. > A potential fix would be, to store latest observed partition-time in the > metadata of committed offsets. Thus way, on restart/rebalance we can > re-initialize time correctly. > Notice that this particular issue applies for all Stream Tasks in the > topology. The further down the DAG records flow, the more likely it is that > the StreamTask will have an incorrect stream time. For instance, if r3 was > filtered out, the tasks receiving the processed records will compute the > stream time as 5 instead of the correct timestamp being 11. This entails us > to also propagate the latest observed partition time as well downstream. > That means the sources located at the head of the topology must forward the > partition time to its subtopologies whenever records are sent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8341) AdminClient should retry coordinator lookup after NOT_COORDINATOR error
[ https://issues.apache.org/jira/browse/KAFKA-8341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846900#comment-16846900 ] ASF GitHub Bot commented on KAFKA-8341: --- soondenana commented on pull request #6723: KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error URL: https://github.com/apache/kafka/pull/6723 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > AdminClient should retry coordinator lookup after NOT_COORDINATOR error > --- > > Key: KAFKA-8341 > URL: https://issues.apache.org/jira/browse/KAFKA-8341 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Vikas Singh >Priority: Major > > If a group operation (e.g. DescribeGroup) fails because the coordinator has > moved, the AdminClient should lookup the coordinator before retrying the > operation. Currently we will either fail or just retry anyway. This is > similar in some ways to controller rediscovery after getting NOT_CONTROLLER > errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8341) AdminClient should retry coordinator lookup after NOT_COORDINATOR error
[ https://issues.apache.org/jira/browse/KAFKA-8341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846904#comment-16846904 ] ASF GitHub Bot commented on KAFKA-8341: --- soondenana commented on pull request #6723: KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error URL: https://github.com/apache/kafka/pull/6723 An api call for consumer groups is made up of two calls: 1. Find the consumer group coordinator 2. Send the request to the node found in step 1 But the coordinator can get moved between step 1 and 2. In that case we currently fail. This change fixes that by detecting this error and then retrying. Following APIs are impacted by this behavior: 1. listConsumerGroupOffsets 2. deleteConsumerGroups 3. describeConsumerGroups Each of these call result in AdminClient making multiple calls to the backend. As AdminClient code invokes each backend api in a separate event loop, the code that detects the error (step 2) need to restart whole operation including step 1. This needed a change to capture the "Call" object for step 1 in step 2. This change thus refactors the code to make it easy to perform a retry of whole operation. It creates a Context object to capture the api arguments that can then be referred by each "Call" objects. This is just for convenience and makes method signature simpler as we only need to pass one object instead of multiple api arguments. The creation of each "Call" object is done in a new method, so we can easily resubmit step 1 in step 2. This change also modifies corresponding unit test to test this scenario. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > AdminClient should retry coordinator lookup after NOT_COORDINATOR error > --- > > Key: KAFKA-8341 > URL: https://issues.apache.org/jira/browse/KAFKA-8341 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Vikas Singh >Priority: Major > > If a group operation (e.g. DescribeGroup) fails because the coordinator has > moved, the AdminClient should lookup the coordinator before retrying the > operation. Currently we will either fail or just retry anyway. This is > similar in some ways to controller rediscovery after getting NOT_CONTROLLER > errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846905#comment-16846905 ] Bill Bejeck commented on KAFKA-8413: Hi, [~lkokhreidze] thanks for reporting this. When you build the topology can you confirm for me that you are calling {{StreamBuilder#build(properties)}}? To optimize the topology you need to pass in the properties to the {{StreamBuilder}} as well as set it in the configuration. Thanks, Bill > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-SINK-02 > <-- KSTREAM-SOURCE-00 > Sink: KSTREAM-SINK-02 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-01 > Sub-topology: 1 > Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 > Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-03 > Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-03 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846916#comment-16846916 ] Levani Kokhreidze commented on KAFKA-8413: -- Hi [~bbejeck], I wasn't passing properties to StreamBuilder before, missed that part. Can confirm, after applying your suggestion there's only one reparation topic. Sorry about the confusion. > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-SINK-02 > <-- KSTREAM-SOURCE-00 > Sink: KSTREAM-SINK-02 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-01 > Sub-topology: 1 > Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 > Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-03 > Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-03 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8416) Improve Documentation for Enabling Optimizations
[ https://issues.apache.org/jira/browse/KAFKA-8416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-8416: --- Component/s: streams > Improve Documentation for Enabling Optimizations > > > Key: KAFKA-8416 > URL: https://issues.apache.org/jira/browse/KAFKA-8416 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Priority: Major > > To enable optimizations, users need to set the > {{StreamsConfig.TOPOLOGY_OPTIMIZATION}} setting to "all". But in addition to > setting the config users need to pass in the {{Properties}} object to the > {{StreamBuilder#build()}} method as well. > > We should make a pass over the existing documentation and Javadoc to make > sure this required step is clear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8416) Improve Documentation for Enabling Optimizations
[ https://issues.apache.org/jira/browse/KAFKA-8416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-8416: --- Labels: newbie, (was: ) > Improve Documentation for Enabling Optimizations > > > Key: KAFKA-8416 > URL: https://issues.apache.org/jira/browse/KAFKA-8416 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Priority: Major > Labels: newbie, > > To enable optimizations, users need to set the > {{StreamsConfig.TOPOLOGY_OPTIMIZATION}} setting to "all". But in addition to > setting the config users need to pass in the {{Properties}} object to the > {{StreamBuilder#build()}} method as well. > > We should make a pass over the existing documentation and Javadoc to make > sure this required step is clear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8416) Improve Documentation for Enabling Optimizations
Bill Bejeck created KAFKA-8416: -- Summary: Improve Documentation for Enabling Optimizations Key: KAFKA-8416 URL: https://issues.apache.org/jira/browse/KAFKA-8416 Project: Kafka Issue Type: Improvement Reporter: Bill Bejeck To enable optimizations, users need to set the {{StreamsConfig.TOPOLOGY_OPTIMIZATION}} setting to "all". But in addition to setting the config users need to pass in the {{Properties}} object to the {{StreamBuilder#build()}} method as well. We should make a pass over the existing documentation and Javadoc to make sure this required step is clear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846927#comment-16846927 ] Bill Bejeck commented on KAFKA-8413: Hi [~lkokhreidze], No problem at all! It's a subtle point and we could probably do a better job of making sure of that step isn't overlooked. I've created https://issues.apache.org/jira/browse/KAFKA-8416 to help improve the documentation for enabling optimizations. I'll go ahead and close this ticket then. > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-SINK-02 > <-- KSTREAM-SOURCE-00 > Sink: KSTREAM-SINK-02 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-01 > Sub-topology: 1 > Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 > Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-03 > Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-03 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8413) Add possibility to do repartitioning on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-8413. Resolution: Not A Problem > Add possibility to do repartitioning on KStream > --- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) >.selectKey((key, value) -> value); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-FILTER-04, KSTREAM-FILTER-08 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-FILTER-04 (stores: []) > --> KSTREAM-SINK-03 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-FILTER-08 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-KEY-SELECT-01 > Sink: KSTREAM-SINK-03 (topic: store-1-repartition) > <-- KSTREAM-FILTER-04 > Sink: KSTREAM-SINK-07 (topic: store-2-repartition) > <-- KSTREAM-FILTER-08 > Sub-topology: 1 > Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-02 > Processor: KSTREAM-AGGREGATE-02 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-05 > Sub-topology: 2 > Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-06 > Processor: KSTREAM-AGGREGATE-06 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-09 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream streamByProfileId = streamsBuilder >.stream("input-topic") >.selectKey((key, value) -> value) >.through("repartition-topic"); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") >); > streamByProfileId >.groupByKey() >.aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") >); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-SINK-02 > <-- KSTREAM-SOURCE-00 > Sink: KSTREAM-SINK-02 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-01 > Sub-topology: 1 > Source: KSTREAM-SOURCE-03 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05 > Processor: KSTREAM-AGGREGATE-04 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-03 > Processor: KSTREAM-AGGREGATE-05 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-03 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8417) Remove redundant network definition --net=host when starting testing docker containers
Konstantine Karantasis created KAFKA-8417: - Summary: Remove redundant network definition --net=host when starting testing docker containers Key: KAFKA-8417 URL: https://issues.apache.org/jira/browse/KAFKA-8417 Project: Kafka Issue Type: Bug Components: system tests Reporter: Konstantine Karantasis Assignee: Konstantine Karantasis The switches {{--net}} and {{--network}} are equivalent in docker, with the latter being preferred. (see [https://github.com/docker/cli/blob/master/cli/command/container/opts.go] where currently there's the comment: _// We allow for both "--net" and "--network", although the latter is the recommended way._) However, in recent Docker versions, defining both as follows: {{--net=host --network ducknet}} fails with error: {{docker: conflicting options: cannot attach both user-defined and non-user-defined network-modes.}} Removing {{--net=host}} and keeping only the user-defined network should fix the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8417) Remove redundant network definition --net=host when starting testing docker containers
[ https://issues.apache.org/jira/browse/KAFKA-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846948#comment-16846948 ] ASF GitHub Bot commented on KAFKA-8417: --- kkonstantine commented on pull request #6797: KAFKA-8417: Remove redundant network definition --net=host when starting testing docker containers URL: https://github.com/apache/kafka/pull/6797 * Remove non-user-defined network --net=host which is redundant when starting system test docker containers * Tested by running a round of system tests locally and on jenkins ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove redundant network definition --net=host when starting testing docker > containers > -- > > Key: KAFKA-8417 > URL: https://issues.apache.org/jira/browse/KAFKA-8417 > Project: Kafka > Issue Type: Bug > Components: system tests >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Major > > The switches {{--net}} and {{--network}} are equivalent in docker, with the > latter being preferred. > (see > [https://github.com/docker/cli/blob/master/cli/command/container/opts.go] > where currently there's the comment: _// We allow for both "--net" and > "--network", although the latter is the recommended way._) > However, in recent Docker versions, defining both as follows: > {{--net=host --network ducknet}} > fails with error: > {{docker: conflicting options: cannot attach both user-defined and > non-user-defined network-modes.}} > Removing {{--net=host}} and keeping only the user-defined network should fix > the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8417) Remove redundant network definition --net=host when starting testing docker containers
[ https://issues.apache.org/jira/browse/KAFKA-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846953#comment-16846953 ] ASF GitHub Bot commented on KAFKA-8417: --- cmccabe commented on pull request #6797: KAFKA-8417: Remove redundant network definition --net=host when starting testing docker containers URL: https://github.com/apache/kafka/pull/6797 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove redundant network definition --net=host when starting testing docker > containers > -- > > Key: KAFKA-8417 > URL: https://issues.apache.org/jira/browse/KAFKA-8417 > Project: Kafka > Issue Type: Bug > Components: system tests >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Major > > The switches {{--net}} and {{--network}} are equivalent in docker, with the > latter being preferred. > (see > [https://github.com/docker/cli/blob/master/cli/command/container/opts.go] > where currently there's the comment: _// We allow for both "--net" and > "--network", although the latter is the recommended way._) > However, in recent Docker versions, defining both as follows: > {{--net=host --network ducknet}} > fails with error: > {{docker: conflicting options: cannot attach both user-defined and > non-user-defined network-modes.}} > Removing {{--net=host}} and keeping only the user-defined network should fix > the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8309) KIP-465: Add Consolidated Connector Endpoint to Connect REST API
[ https://issues.apache.org/jira/browse/KAFKA-8309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8309. -- Resolution: Fixed Assignee: dan norwood Reviewer: Randall Hauch Fix Version/s: 2.3.0 Not sure why this was not closed when the PR was merged, but it's resolved now. > KIP-465: Add Consolidated Connector Endpoint to Connect REST API > > > Key: KAFKA-8309 > URL: https://issues.apache.org/jira/browse/KAFKA-8309 > Project: Kafka > Issue Type: Improvement >Reporter: dan norwood >Assignee: dan norwood >Priority: Major > Fix For: 2.3.0 > > > {color:#33}https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
[ https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Diachenko updated KAFKA-8418: --- Component/s: KafkaConnect > Connect System tests are not waiting for REST resources to be registered > > > Key: KAFKA-8418 > URL: https://issues.apache.org/jira/browse/KAFKA-8418 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
Oleksandr Diachenko created KAFKA-8418: -- Summary: Connect System tests are not waiting for REST resources to be registered Key: KAFKA-8418 URL: https://issues.apache.org/jira/browse/KAFKA-8418 Project: Kafka Issue Type: Bug Reporter: Oleksandr Diachenko Assignee: Oleksandr Diachenko -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
[ https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Diachenko updated KAFKA-8418: --- Affects Version/s: 2.2.0 > Connect System tests are not waiting for REST resources to be registered > > > Key: KAFKA-8418 > URL: https://issues.apache.org/jira/browse/KAFKA-8418 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
[ https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Diachenko updated KAFKA-8418: --- Fix Version/s: 2.3.0 > Connect System tests are not waiting for REST resources to be registered > > > Key: KAFKA-8418 > URL: https://issues.apache.org/jira/browse/KAFKA-8418 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > Fix For: 2.3.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation
[ https://issues.apache.org/jira/browse/KAFKA-8415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846985#comment-16846985 ] ASF GitHub Bot commented on KAFKA-8415: --- rhauch commented on pull request #6796: KAFKA-8415: Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation URL: https://github.com/apache/kafka/pull/6796 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Interface ConnectorClientConfigOverridePolicy needs to be excluded from class > loading isolation > --- > > Key: KAFKA-8415 > URL: https://issues.apache.org/jira/browse/KAFKA-8415 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 2.3.0 > > > Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} > were recently added in Connect as plugins that can be loaded in class loading > isolation. > However the interface itself was not excluded from isolation itself, which > results into definition conflicts. Any interface that is considered a base > Connect plugin interface needs to be excluded by isolation itself (it's > considered a "system" type). > Here's the exception: > {code:java} > [2019-05-23 15:16:57,802] ERROR Stopping due to error > (org.apache.kafka.connect.cli.ConnectDistributed:84) > java.util.ServiceConfigurationError: > org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: > Provider > org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy > not a subtype > at java.util.ServiceLoader.fail(ServiceLoader.java:239) > at java.util.ServiceLoader.access$300(ServiceLoader.java:185) > at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) > at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) > at java.util.ServiceLoader$1.next(ServiceLoader.java:480) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
[ https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Diachenko updated KAFKA-8418: --- Description: I am getting an error while running Kafka tests: {code} Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", line 132, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", line 189, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", line 89, in test_rest_api assert set([connector_plugin['class'] for connector_plugin in self.cc.list_connector_plugins()]) == \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", line 218, in list_connector_plugins return self._rest('/connector-plugins/', node=node) File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, resp.url) ConnectRestError {code} > Connect System tests are not waiting for REST resources to be registered > > > Key: KAFKA-8418 > URL: https://issues.apache.org/jira/browse/KAFKA-8418 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > Fix For: 2.3.0 > > > I am getting an error while running Kafka tests: > {code} > Traceback (most recent call last): File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 132, in run data = self.run_test() File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 189, in run_test return self.test_context.function(self.test) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 89, in test_rest_api assert set([connector_plugin['class'] for > connector_plugin in self.cc.list_connector_plugins()]) == > \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 218, in list_connector_plugins return self._rest('/connector-plugins/', > node=node) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, > resp.url) ConnectRestError > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
[ https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Diachenko updated KAFKA-8418: --- Description: I am getting an error while running Kafka tests: {code} Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", line 132, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", line 189, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", line 89, in test_rest_api assert set([connector_plugin['class'] for connector_plugin in self.cc.list_connector_plugins()]) == \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", line 218, in list_connector_plugins return self._rest('/connector-plugins/', node=node) File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, resp.url) ConnectRestError {code} >From the logs, I see two messages: {code} [2019-05-23 16:09:59,373] INFO REST server listening at http://172.31.39.205:8083/, advertising URL http://worker1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer) {code} and {code} [2019-05-23 16:10:00,738] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer) {code} it takes 1365 ms to actually load REST resources, but the test is waiting on a port to be listening. was: I am getting an error while running Kafka tests: {code} Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", line 132, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", line 189, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", line 89, in test_rest_api assert set([connector_plugin['class'] for connector_plugin in self.cc.list_connector_plugins()]) == \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", line 218, in list_connector_plugins return self._rest('/connector-plugins/', node=node) File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, resp.url) ConnectRestError {code} > Connect System tests are not waiting for REST resources to be registered > > > Key: KAFKA-8418 > URL: https://issues.apache.org/jira/browse/KAFKA-8418 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > Fix For: 2.3.0 > > > I am getting an error while running Kafka tests: > {code} > Traceback (most recent call last): File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 132, in run data = self.run_test() File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 189, in run_test return self.test_context.function(self.test) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 89, in test_rest_api assert set([connector_plugin['class'] for > connector_plugin in self.cc.list_connector_plugins()]) == > \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 218, in list_connector_plugins return self._rest('/connector-plugins/', > node=node) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, > resp.url) ConnectRestError > {code} > From the logs, I see two messages: > {code} > [2
[jira] [Resolved] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation
[ https://issues.apache.org/jira/browse/KAFKA-8415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8415. -- Resolution: Fixed Reviewer: Randall Hauch Merged onto the `trunk` and `2.3` branches. > Interface ConnectorClientConfigOverridePolicy needs to be excluded from class > loading isolation > --- > > Key: KAFKA-8415 > URL: https://issues.apache.org/jira/browse/KAFKA-8415 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 2.3.0 > > > Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} > were recently added in Connect as plugins that can be loaded in class loading > isolation. > However the interface itself was not excluded from isolation itself, which > results into definition conflicts. Any interface that is considered a base > Connect plugin interface needs to be excluded by isolation itself (it's > considered a "system" type). > Here's the exception: > {code:java} > [2019-05-23 15:16:57,802] ERROR Stopping due to error > (org.apache.kafka.connect.cli.ConnectDistributed:84) > java.util.ServiceConfigurationError: > org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: > Provider > org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy > not a subtype > at java.util.ServiceLoader.fail(ServiceLoader.java:239) > at java.util.ServiceLoader.access$300(ServiceLoader.java:185) > at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) > at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) > at java.util.ServiceLoader$1.next(ServiceLoader.java:480) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8407) Connector client overrides broken on client configs with type 'Class' or 'List'
[ https://issues.apache.org/jira/browse/KAFKA-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846991#comment-16846991 ] ASF GitHub Bot commented on KAFKA-8407: --- rhauch commented on pull request #6789: KAFKA-8407: Fix validation of class and list configs in connector client overrides URL: https://github.com/apache/kafka/pull/6789 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connector client overrides broken on client configs with type 'Class' or > 'List' > --- > > Key: KAFKA-8407 > URL: https://issues.apache.org/jira/browse/KAFKA-8407 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Labels: connect > > When a connector request is submitted that overrides a client configuration > that is meant to contain the name of a class (such as > {{sasl.login.callback.handler.class}}), a 500 response is generated and the > following stack trace can be found in the logs for Connect: > > {quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to > /connectors > (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61) > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Class > at > org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774) > at > org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491) > at > org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426) > at > org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:241) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {quote} > This appears to be limited only to client configs that are meant to be > classes or lists due to the fact that {{ConfigDef.convertToString(...)}} > assumes its first argument is an instance of {{Class}} when its second > argument is {{ConfigDef.Type.CLASS}} and then casts accordingly, and acts > similarly for lists. If the second argument is anything else, {{toString()}} > is invoked on it without any casting, avoiding any problems. > > The cause of this is due to the fact that the newly-introduced > {{ConnectorClientConfigOverridePolicy}} interface returns a list of > {{ConfigValue}} instances for its validation. The {{value()}} for each of > these can be any type, although with the default implementations available > ({{All}}, {{None}}, {{Principal}}) if one is returned at all it's just the > same type of what was passed in for that particular config. In the case of > the {{AbstractHerder.validateClientOverrides(...)}} method, the raw strings > for the client configs are used. However, the > {{AbstractHerder.convertConfigValue(...)}} is then called for those raw > strings but with the {{ConfigDef.Type}} of the config based on the relevant > client {{ConfigDef}} (i.e., {{ProducerConfig.configDef()}}, > {{ConsumerConfig.configDef()}}, or {{AdminClientConfig.configDef()}}). This > in turn can and will result in > {{ConfigDef.convertToString(someClassNameAsAString, ConfigDef.Type.CLASS)}} > being invoked. > > Although this isn't technically a comprehensive fix, a quick option would be > to invoke {{ConfigDef.parse(...)}} using the relevant client {{ConfigDef}} > before passing overrides to the policy. Technically, this would still lead to > problems if the policy decided to return just the name of a class for a > config that of type class instead, so we may want to investigate other > options as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8407) Connector client overrides broken on client configs with type 'Class' or 'List'
[ https://issues.apache.org/jira/browse/KAFKA-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8407. -- Resolution: Fixed Reviewer: Randall Hauch Fix Version/s: 2.3.0 Merged onto the `trunk` and `2.3` branches. > Connector client overrides broken on client configs with type 'Class' or > 'List' > --- > > Key: KAFKA-8407 > URL: https://issues.apache.org/jira/browse/KAFKA-8407 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Labels: connect > Fix For: 2.3.0 > > > When a connector request is submitted that overrides a client configuration > that is meant to contain the name of a class (such as > {{sasl.login.callback.handler.class}}), a 500 response is generated and the > following stack trace can be found in the logs for Connect: > > {quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to > /connectors > (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61) > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Class > at > org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774) > at > org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491) > at > org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426) > at > org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:241) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {quote} > This appears to be limited only to client configs that are meant to be > classes or lists due to the fact that {{ConfigDef.convertToString(...)}} > assumes its first argument is an instance of {{Class}} when its second > argument is {{ConfigDef.Type.CLASS}} and then casts accordingly, and acts > similarly for lists. If the second argument is anything else, {{toString()}} > is invoked on it without any casting, avoiding any problems. > > The cause of this is due to the fact that the newly-introduced > {{ConnectorClientConfigOverridePolicy}} interface returns a list of > {{ConfigValue}} instances for its validation. The {{value()}} for each of > these can be any type, although with the default implementations available > ({{All}}, {{None}}, {{Principal}}) if one is returned at all it's just the > same type of what was passed in for that particular config. In the case of > the {{AbstractHerder.validateClientOverrides(...)}} method, the raw strings > for the client configs are used. However, the > {{AbstractHerder.convertConfigValue(...)}} is then called for those raw > strings but with the {{ConfigDef.Type}} of the config based on the relevant > client {{ConfigDef}} (i.e., {{ProducerConfig.configDef()}}, > {{ConsumerConfig.configDef()}}, or {{AdminClientConfig.configDef()}}). This > in turn can and will result in > {{ConfigDef.convertToString(someClassNameAsAString, ConfigDef.Type.CLASS)}} > being invoked. > > Although this isn't technically a comprehensive fix, a quick option would be > to invoke {{ConfigDef.parse(...)}} using the relevant client {{ConfigDef}} > before passing overrides to the policy. Technically, this would still lead to > problems if the policy decided to return just the name of a class for a > config that of type class instead, so we may want to investigate other > options as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8419) Enable KafkaLog4JAppender to use SASL Authentication Callback Handlers
Ryan P created KAFKA-8419: - Summary: Enable KafkaLog4JAppender to use SASL Authentication Callback Handlers Key: KAFKA-8419 URL: https://issues.apache.org/jira/browse/KAFKA-8419 Project: Kafka Issue Type: Improvement Reporter: Ryan P Assignee: Ryan P The log4j Kafka appender supports SASL but lacks support for the callback handlers added with KIP-86. This is Jira was created to request that the sasl callback handler client configuration be exposed in the KafkaLog4jAppender class for use by the underlying producer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign
Guozhang Wang created KAFKA-8420: Summary: Graceful handling when consumer switches from subscribe to manual assign Key: KAFKA-8420 URL: https://issues.apache.org/jira/browse/KAFKA-8420 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Guozhang Wang Today if a consumer switches between subscribe (and hence relies on group rebalance to get assignment) and manual assign, it may cause unnecessary rebalances. For example: 1. consumer.subscribe(); 2. consumer.poll(); // join-group request sent, returns empty because poll timeout 3. consumer.unsubscribe(); 4. consumer.assign(..); 5. consumer.poll(); // sync-group request received, and the assigned partitions does not match the current subscription-state. In this case it will tries to re-join which is not necessary. In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins. Although it is not a very common usage scenario, it still worth being better handled than the status-quo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8371) Remove ReplicaManager dependence from Partition
[ https://issues.apache.org/jira/browse/KAFKA-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8371. Resolution: Fixed > Remove ReplicaManager dependence from Partition > --- > > Key: KAFKA-8371 > URL: https://issues.apache.org/jira/browse/KAFKA-8371 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: tech-debt > > The dependence on ReplicaManager from the Partition logic makes testing this > class very cumbersome. Often we are just using ReplicaManager as a way to get > access to an additional dependency. We should make the actual dependencies > explicit and we should introduce smaller traits which encapsulate the state > we actually need. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8371) Remove ReplicaManager dependence from Partition
[ https://issues.apache.org/jira/browse/KAFKA-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847026#comment-16847026 ] ASF GitHub Bot commented on KAFKA-8371: --- hachikuji commented on pull request #6705: KAFKA-8371: Remove dependence on ReplicaManager from Partition URL: https://github.com/apache/kafka/pull/6705 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove ReplicaManager dependence from Partition > --- > > Key: KAFKA-8371 > URL: https://issues.apache.org/jira/browse/KAFKA-8371 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: tech-debt > > The dependence on ReplicaManager from the Partition logic makes testing this > class very cumbersome. Often we are just using ReplicaManager as a way to get > access to an additional dependency. We should make the actual dependencies > explicit and we should introduce smaller traits which encapsulate the state > we actually need. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
Guozhang Wang created KAFKA-8421: Summary: Allow consumer.poll() to return data in the middle of rebalance Key: KAFKA-8421 URL: https://issues.apache.org/jira/browse/KAFKA-8421 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Guozhang Wang With KIP-429 in place, today when a consumer is about to send join-group request its owned partitions may not be empty, meaning that some of its fetched data can still be returned. Nevertheless, today the logic is strict: {code} if (!updateAssignmentMetadataIfNeeded(timer)) { return ConsumerRecords.empty(); } {code} I.e. if the consumer enters a rebalance it always returns no data. As an optimization, we can consider letting consumers to still return messages that still belong to its owned partitions even when it is within a rebalance, because we know it is safe that no one else would claim those partitions in this rebalance yet, and we can still commit offsets if, after this rebalance, the partitions need to be revoked then. One thing we need to take care though is the rebalance timeout, i.e. when consumer's processing those records they may not call the next poll() in time (think: Kafka Streams num.iterations mechanism), which may leads to consumer dropping out of the group during rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847033#comment-16847033 ] Sophie Blee-Goldman commented on KAFKA-8367: How many state stores are present in your topology? How many partitions? > Non-heap memory leak in Kafka Streams > - > > Key: KAFKA-8367 > URL: https://issues.apache.org/jira/browse/KAFKA-8367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: Pavel Savov >Priority: Major > Attachments: memory-prod.png, memory-test.png > > > We have been observing a non-heap memory leak after upgrading to Kafka > Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the > leak only happens when we enable stateful stream operations (utilizing > stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 > and ported the fix scheduled for release in 2.2.1 to our fork. It did not > stop the leak, however. > We are having this memory leak in our production environment where the > consumer group is auto-scaled in and out in response to changes in traffic > volume, and in our test environment where we have two consumers, no > autoscaling and relatively constant traffic. > Below is some information I'm hoping will be of help: > * RocksDB Config: > Block cache size: 4 MiB > Write buffer size: 2 MiB > Block size: 16 KiB > Cache index and filter blocks: true > Manifest preallocation size: 64 KiB > Max write buffer number: 3 > Max open files: 6144 > > * Memory usage in production > The attached graph (memory-prod.png) shows memory consumption for each > instance as a separate line. The horizontal red line at 6 GiB is the memory > limit. > As illustrated on the attached graph from production, memory consumption in > running instances goes up around autoscaling events (scaling the consumer > group either in or out) and associated rebalancing. It stabilizes until the > next autoscaling event but it never goes back down. > An example of scaling out can be seen from around 21:00 hrs where three new > instances are started in response to a traffic spike. > Just after midnight traffic drops and some instances are shut down. Memory > consumption in the remaining running instances goes up. > Memory consumption climbs again from around 6:00AM due to increased traffic > and new instances are being started until around 10:30AM. Memory consumption > never drops until the cluster is restarted around 12:30. > > * Memory usage in test > As illustrated by the attached graph (memory-test.png) we have a fixed number > of two instances in our test environment and no autoscaling. Memory > consumption rises linearly until it reaches the limit (around 2:00 AM on > 5/13) and Mesos restarts the offending instances, or we restart the cluster > manually. > > * No heap leaks observed > * Window retention: 2 or 11 minutes (depending on operation type) > * Issue not present in Kafka Streams 2.0.1 > * No memory leak for stateless stream operations (when no RocksDB stores are > used) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8419) Enable KafkaLog4JAppender to use SASL Authentication Callback Handlers
[ https://issues.apache.org/jira/browse/KAFKA-8419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847043#comment-16847043 ] ASF GitHub Bot commented on KAFKA-8419: --- rnpridgeon commented on pull request #6799: KAFKA-8419 Add SASL callback handler support to KafkaLog4jAppender URL: https://github.com/apache/kafka/pull/6799 KIP-470 https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+Enable+KafkaLog4JAppender+to+use+SASL+Authentication+Callback+Handlers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable KafkaLog4JAppender to use SASL Authentication Callback Handlers > -- > > Key: KAFKA-8419 > URL: https://issues.apache.org/jira/browse/KAFKA-8419 > Project: Kafka > Issue Type: Improvement >Reporter: Ryan P >Assignee: Ryan P >Priority: Major > > The log4j Kafka appender supports SASL but lacks support for the callback > handlers added with KIP-86. This is Jira was created to request that the sasl > callback handler client configuration be exposed in the KafkaLog4jAppender > class for use by the underlying producer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8333) Load high watermark checkpoint only once when handling LeaderAndIsr requests
[ https://issues.apache.org/jira/browse/KAFKA-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847047#comment-16847047 ] ASF GitHub Bot commented on KAFKA-8333: --- hachikuji commented on pull request #6696: KAFKA-8333; Cache checkpointed high watermarks for reuse on LeaderAndIsr request URL: https://github.com/apache/kafka/pull/6696 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Load high watermark checkpoint only once when handling LeaderAndIsr requests > > > Key: KAFKA-8333 > URL: https://issues.apache.org/jira/browse/KAFKA-8333 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Currently we reload the checkpoint file separately for every partition that > is first initialized on the broker. It would be more efficient to do this one > time only when we receive the LeaderAndIsr request and to reuse the state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847056#comment-16847056 ] Srikala commented on KAFKA-7500: [~ryannedolan], I configured connect mirrormaker for replicating topics between two clusters on kafka 1.1.1 from source to target. Can you please provide your input to the following. # The topics are created with a replication factor 1 in the target cluster even though the source cluster has replication factor 4. The default replication factor of the target cluster is 4. Obviously, the topic data is failing to replicate with the error: org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition target.TEST_TOPIC is [1], below required minimum [2] Replication works fine , if I manually create the topics in the target cluster with replication factor 4 before starting the replication. # I see the following lines in the log: [2019-05-23 20:50:39,033] WARN [Producer clientId=producer-9] Error while fetching metadata with correlation id 2079 : \{source.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1023) [2019-05-23 20:50:39,039] WARN [Producer clientId=producer-8] Error while fetching metadata with correlation id 2175 : \{heartbeats=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1023) How are the topics created: source.checkpoints.internal and heartbeats? 3. Is this the right forum to ask questions during the evaluation? Thanks! > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ryanne Dolan >Priority: Minor > Fix For: 2.4 > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8333) Load high watermark checkpoint only once when handling LeaderAndIsr requests
[ https://issues.apache.org/jira/browse/KAFKA-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847071#comment-16847071 ] ASF GitHub Bot commented on KAFKA-8333: --- hachikuji commented on pull request #6800: KAFKA-8333; Load high watermark checkpoint lazily when initializing replicas URL: https://github.com/apache/kafka/pull/6800 Currently we load the high watermark checkpoint separately for every replica that we load. This patch makes this loading logic lazy and caches the loaded map while a LeaderAndIsr request is being handled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Load high watermark checkpoint only once when handling LeaderAndIsr requests > > > Key: KAFKA-8333 > URL: https://issues.apache.org/jira/browse/KAFKA-8333 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Currently we reload the checkpoint file separately for every partition that > is first initialized on the broker. It would be more efficient to do this one > time only when we receive the LeaderAndIsr request and to reuse the state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8093) Fix JavaDoc markup
[ https://issues.apache.org/jira/browse/KAFKA-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847073#comment-16847073 ] ASF GitHub Bot commented on KAFKA-8093: --- pkleindl commented on pull request #6439: KAFKA-8093: fixed some javadoc errors URL: https://github.com/apache/kafka/pull/6439 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix JavaDoc markup > -- > > Key: KAFKA-8093 > URL: https://issues.apache.org/jira/browse/KAFKA-8093 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Matthias J. Sax >Assignee: Patrik Kleindl >Priority: Trivial > > Running `./gradlew install` gives the following warning > {code:java} > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87: > warning - Tag @link: reference not found: java.nio.channels.Selector > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87: > warning - Tag @link: reference not found: > java.nio.channels.Selector#wakeup() wakeup() > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java:34: > warning - Tag @link: reference not found: > org.apache.kafka.clients.producer.ProducerRecord ProducerRecord > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > {code} > Those should be fixed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8093) Fix JavaDoc markup
[ https://issues.apache.org/jira/browse/KAFKA-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrik Kleindl resolved KAFKA-8093. --- Resolution: Fixed Fixed by other commits, see PR for discussion. > Fix JavaDoc markup > -- > > Key: KAFKA-8093 > URL: https://issues.apache.org/jira/browse/KAFKA-8093 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Matthias J. Sax >Assignee: Patrik Kleindl >Priority: Trivial > > Running `./gradlew install` gives the following warning > {code:java} > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87: > warning - Tag @link: reference not found: java.nio.channels.Selector > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87: > warning - Tag @link: reference not found: > java.nio.channels.Selector#wakeup() wakeup() > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java:34: > warning - Tag @link: reference not found: > org.apache.kafka.clients.producer.ProducerRecord ProducerRecord > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > {code} > Those should be fixed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8093) Fix JavaDoc markup
[ https://issues.apache.org/jira/browse/KAFKA-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrik Kleindl reassigned KAFKA-8093: - Assignee: Matthias J. Sax (was: Patrik Kleindl) > Fix JavaDoc markup > -- > > Key: KAFKA-8093 > URL: https://issues.apache.org/jira/browse/KAFKA-8093 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Trivial > > Running `./gradlew install` gives the following warning > {code:java} > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87: > warning - Tag @link: reference not found: java.nio.channels.Selector > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87: > warning - Tag @link: reference not found: > java.nio.channels.Selector#wakeup() wakeup() > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java:34: > warning - Tag @link: reference not found: > org.apache.kafka.clients.producer.ProducerRecord ProducerRecord > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > {code} > Those should be fixed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8422) Client should not use old versions of OffsetsForLeaderEpoch
Jason Gustafson created KAFKA-8422: -- Summary: Client should not use old versions of OffsetsForLeaderEpoch Key: KAFKA-8422 URL: https://issues.apache.org/jira/browse/KAFKA-8422 Project: Kafka Issue Type: Bug Components: consumer Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 2.3.0 For KIP-320, we changed the permissions of the OffsetsForLeaderEpoch to be topic-level so that consumers did not require Cluster permission. However, there is no way for a consumer to know whether the broker is new enough to support this permission scheme. The only way to be sure is to use the version of this API that was bumped in 2.3. For older versions, we should revert to the old behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8404) Authorization header is not passed in Connect when forwarding REST requests
[ https://issues.apache.org/jira/browse/KAFKA-8404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Yokota updated KAFKA-8404: - Affects Version/s: 2.0.0 > Authorization header is not passed in Connect when forwarding REST requests > --- > > Key: KAFKA-8404 > URL: https://issues.apache.org/jira/browse/KAFKA-8404 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Robert Yokota >Priority: Blocker > Fix For: 2.3.0 > > > When Connect forwards a REST request from one worker to another, the > Authorization header is not forwarded. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8423) Update ducktape to not use deprecated APIs
Matthias J. Sax created KAFKA-8423: -- Summary: Update ducktape to not use deprecated APIs Key: KAFKA-8423 URL: https://issues.apache.org/jira/browse/KAFKA-8423 Project: Kafka Issue Type: Improvement Components: system tests Affects Versions: 2.3.0 Reporter: Matthias J. Sax Running system tests locally, I see the following warnings: {code:java} /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:39: CryptographyDeprecationWarning: encode_point has been deprecated on EllipticCurvePublicNumbers and will be removed in a future version. Please use EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed point encoding. m.add_string(self.Q_C.public_numbers().encode_point()) /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:94: CryptographyDeprecationWarning: Support for unsafe construction of public numbers from encoded data will be removed in a future version. Please use EllipticCurvePublicKey.from_encoded_point self.curve, Q_S_bytes /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:109: CryptographyDeprecationWarning: encode_point has been deprecated on EllipticCurvePublicNumbers and will be removed in a future version. Please use EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed point encoding. hm.add_string(self.Q_C.public_numbers().encode_point()) {code} We should update the code to not use deprecated APIs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8424) Replace ListGroups request/response with automated protocol
Boyang Chen created KAFKA-8424: -- Summary: Replace ListGroups request/response with automated protocol Key: KAFKA-8424 URL: https://issues.apache.org/jira/browse/KAFKA-8424 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847170#comment-16847170 ] Boyang Chen commented on KAFKA-8311: [~clearpal7] No worry, let me take a look of your fix! > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: WooYoung >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics
[ https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847174#comment-16847174 ] Boyang Chen commented on KAFKA-8342: [~clearpal7] Let me know if this sounds interesting to you. > Admin tool to setup Kafka Stream topology (internal) topics > --- > > Key: KAFKA-8342 > URL: https://issues.apache.org/jira/browse/KAFKA-8342 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > We have seen customers who need to deploy their application to production > environment but don't have access to create changelog and repartition topics. > They need to ask admin team to manually create those topics before proceeding > to start the actual stream job. We could add an admin tool to help them go > through the process quicker by providing a command that could > # Read through current stream topology > # Create corresponding topics as needed, even including output topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers
[ https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847211#comment-16847211 ] Sebastiaan commented on KAFKA-8412: --- [~mjsax] yes we are using EOS. And yeah you're right, I was surprised it was solved by a null check for the other ticket when it also seemed to me the situation should be avoided in the first place. But I'll leave that to the actual developers. > Still a nullpointer exception thrown on shutdown while flushing before > closing producers > > > Key: KAFKA-8412 > URL: https://issues.apache.org/jira/browse/KAFKA-8412 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Sebastiaan >Priority: Minor > > I found a closed issue and replied there but decided to open one myself > because although they're related they're slightly different. The original > issue is at https://issues.apache.org/jira/browse/KAFKA-7678 > The fix there has been to implement a null check around closing a producer > because in some cases the producer is already null there (has been closed > already) > In version 2.1.1 we are getting a very similar exception, but in the 'flush' > method that is called pre-close. This is in the log: > {code:java} > message: stream-thread > [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed > while closing StreamTask 1_26 due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > Followed by: > > {code:java} > message: task [1_26] Could not close task due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.StreamTask > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > If I look at the source code at this point, I see a nice null check in the > close method, but not in the flush method that is called just before that: > {code:java} > public void flush() { > this.log.debug("Flushing producer"); > this.producer.flush(); > this.checkForException(); > } > public void close() { > this.log.debug("Closing producer"); > if (this.producer != null) { > this.producer.close(); > this.producer = null; > } > this.checkForException(); > }{code} > Seems to my (ignorant) eye that the flush method should also be wrapped in a > null check in the same way as has been done for close. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8424) Replace ListGroups request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847241#comment-16847241 ] ASF GitHub Bot commented on KAFKA-8424: --- abbccdda commented on pull request #6805: KAFKA-8424: replace ListGroups request/response with automated protocol URL: https://github.com/apache/kafka/pull/6805 As title suggested. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace ListGroups request/response with automated protocol > --- > > Key: KAFKA-8424 > URL: https://issues.apache.org/jira/browse/KAFKA-8424 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics
[ https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847245#comment-16847245 ] WooYoung commented on KAFKA-8342: - [~bchen225242] I`m interested in this ticket to help them to go through the process quicker by providing a command tool. could I take this ticket? > Admin tool to setup Kafka Stream topology (internal) topics > --- > > Key: KAFKA-8342 > URL: https://issues.apache.org/jira/browse/KAFKA-8342 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > We have seen customers who need to deploy their application to production > environment but don't have access to create changelog and repartition topics. > They need to ask admin team to manually create those topics before proceeding > to start the actual stream job. We could add an admin tool to help them go > through the process quicker by providing a command that could > # Read through current stream topology > # Create corresponding topics as needed, even including output topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics
[ https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847286#comment-16847286 ] Boyang Chen commented on KAFKA-8342: [~clearpal7] Yea, feel free to take it. I will probably be away for a couple of days, you could take a initial look and see how that tool should be built. Thank you for the help! > Admin tool to setup Kafka Stream topology (internal) topics > --- > > Key: KAFKA-8342 > URL: https://issues.apache.org/jira/browse/KAFKA-8342 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > We have seen customers who need to deploy their application to production > environment but don't have access to create changelog and repartition topics. > They need to ask admin team to manually create those topics before proceeding > to start the actual stream job. We could add an admin tool to help them go > through the process quicker by providing a command that could > # Read through current stream topology > # Create corresponding topics as needed, even including output topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)