[
https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692697#comment-16692697
]
ASF GitHub Bot commented on KAFKA-6567:
---------------------------------------
guozhangwang closed pull request #5922: KAFKA-6567: Remove KStreamWindowReducer
URL: https://github.com/apache/kafka/pull/5922
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
deleted file mode 100644
index babe3ebd836..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windows;
-
-class KStreamWindowReduce<K, V, W extends Window> extends
KStreamWindowAggregate<K, V, V, W> {
- KStreamWindowReduce(final Windows<W> windows,
- final String storeName,
- final Reducer<V> reducer) {
- super(
- windows,
- storeName,
- () -> null,
- (key, newValue, oldValue) -> oldValue == null ? newValue :
reducer.apply(oldValue, newValue)
- );
- }
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index ecfe1554815..dfead3e6336 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -147,7 +147,7 @@
return aggregateBuilder.build(
REDUCE_NAME,
materialize(materializedInternal),
- new KStreamWindowReduce<>(windows,
materializedInternal.storeName(), reducer),
+ new KStreamWindowAggregate<>(windows,
materializedInternal.storeName(), aggregateBuilder.reduceInitializer,
aggregatorForReducer(reducer)),
materializedInternal.isQueryable(),
materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
materializedInternal.valueSerde());
@@ -216,4 +216,8 @@
}
return builder;
}
+
+ private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer)
{
+ return (aggKey, value, aggregate) -> aggregate == null ? value :
reducer.apply(aggregate, value);
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
deleted file mode 100644
index 634cb2f35a5..00000000000
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.streams.test.OutputVerifier;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static java.time.Duration.ofMillis;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-
-public class KStreamWindowReduceTest {
-
- private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- private final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-
- @Test
- public void shouldLogAndMeterOnNullKey() {
-
- final StreamsBuilder builder = new StreamsBuilder();
- builder
- .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(500L)))
- .reduce((value1, value2) -> value1 + "+" + value2);
-
-
- try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
- final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister();
- driver.pipeInput(recordFactory.create("TOPIC", null, "asdf"));
- LogCaptureAppender.unregister(appender);
-
- assertEquals(1.0, getMetricByName(driver.metrics(),
"skipped-records-total", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to
null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
- }
- }
-
- @Deprecated // testing deprecated functionality (behavior of until)
- @Test
- public void shouldLogAndMeterOnExpiredEvent() {
-
- final StreamsBuilder builder = new StreamsBuilder();
- builder
- .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(5L)).until(100))
- .reduce((value1, value2) -> value1 + "+" + value2)
- .toStream()
- .map((key, value) -> new KeyValue<>(key.toString(), value))
- .to("output");
-
-
- try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
-
LogCaptureAppender.setClassLoggerToDebug(KStreamWindowReduce.class);
- final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister();
- driver.pipeInput(recordFactory.create("TOPIC", "k", "100", 100L));
- driver.pipeInput(recordFactory.create("TOPIC", "k", "0", 0L));
- driver.pipeInput(recordFactory.create("TOPIC", "k", "1", 1L));
- driver.pipeInput(recordFactory.create("TOPIC", "k", "2", 2L));
- driver.pipeInput(recordFactory.create("TOPIC", "k", "3", 3L));
- driver.pipeInput(recordFactory.create("TOPIC", "k", "4", 4L));
- driver.pipeInput(recordFactory.create("TOPIC", "k", "5", 5L));
- LogCaptureAppender.unregister(appender);
-
- final Metric dropMetric = driver.metrics().get(new MetricName(
- "late-record-drop-total",
- "stream-processor-node-metrics",
- "The total number of occurrence of late-record-drop
operations.",
- mkMap(
- mkEntry("client-id",
"topology-test-driver-virtual-thread"),
- mkEntry("task-id", "0_0"),
- mkEntry("processor-node-id", "KSTREAM-REDUCE-0000000002")
- )
- ));
-
- assertThat(dropMetric.metricValue(), equalTo(5.0));
- assertThat(appender.getMessages(), hasItems(
- "Skipping record for expired window. key=[k] topic=[TOPIC]
partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5]",
- "Skipping record for expired window. key=[k] topic=[TOPIC]
partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5]",
- "Skipping record for expired window. key=[k] topic=[TOPIC]
partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5]",
- "Skipping record for expired window. key=[k] topic=[TOPIC]
partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5]",
- "Skipping record for expired window. key=[k] topic=[TOPIC]
partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5]"
- ));
-
- OutputVerifier.compareKeyValueTimestamp(getOutput(driver),
"[k@100/105]", "100", 100);
- OutputVerifier.compareKeyValueTimestamp(getOutput(driver),
"[k@5/10]", "5", 5);
- assertThat(driver.readOutput("output"), nullValue());
- }
- }
-
- private ProducerRecord<String, String> getOutput(final TopologyTestDriver
driver) {
- return driver.readOutput("output", new StringDeserializer(), new
StringDeserializer());
- }
-}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
> KStreamWindowReduce can be replaced by KStreamWindowAggregate
> -------------------------------------------------------------
>
> Key: KAFKA-6567
> URL: https://issues.apache.org/jira/browse/KAFKA-6567
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Samuel Hawker
> Priority: Major
> Labels: newbie
> Fix For: 2.2.0
>
>
> This is a tech debt worth cleaning up: KStreamWindowReduce should be able to
> be replaced by KStreamWindowAggregate. In fact, we have already done this for
> session windows, where in {{SessionWindowedKStreamImpl}} we use
> {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}}
> to replace reducer with aggregator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)