wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1232831833


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {

Review Comment:
   If you find it easier to understand sure. that is fine



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final 
KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   I see what you mean but we actually were going to create the store and leave 
it empty for zero duration. The point was to make it easier to change the grace 
period if desired so the store isn't getting created and destroyed. Something 
about making it more backward compatible.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   yep sure.
   
   (Edit: The class it extents also uses junit 4 and I cant change this one 
without also chaining that and all other join integration tests)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)
+            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+            doJoin(record);
+        } else {
+            final long deadline = observedStreamTime - 
gracePeriod.get().toMillis();
+            if (record.timestamp() <= deadline) {

Review Comment:
   That is a good idea. I'm also in favor of having put decide.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new 
KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+
+    @Before
+    public void setUp() {
+        builder = new StreamsBuilder();
+        processor = supplier.theCapturedProcessor();
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+        ).process(supplier);
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+    }
+
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String 
valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + 
expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) 
{
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {

Review Comment:
   Ah this should be removed. It will be added back in the next PR (or the one 
after)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final 
KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   So we are going to make a store even when the grace period is zero. This 
should make changing the grace period to and from zero smoother. So if someone 
set the grace period to zero then we can evict everything from the buffer even 
after the grace period is changed.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamTableJoinWithGraceIntegrationTest extends 
AbstractJoinIntegrationTest {
+
+    private static final String STORE_NAME = "table-store";
+
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    private KStream<Long, String> leftStream;
+    private KTable<Long, String> rightTable;
+    private Joined<Long, String, String> joined;
+
+    public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled) 
{
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+        appID = "stream-table-join-integration-test";
+        builder = new StreamsBuilder();
+        joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), 
"Grace", Duration.ZERO);

Review Comment:
   I'll add some, I think I need to rework what is in the integration tests and 
the unit tests. Should be fixed. 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)
+            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+            doJoin(record);
+        } else {
+            final long deadline = observedStreamTime - 
gracePeriod.get().toMillis();
+            if (record.timestamp() <= deadline) {
+                doJoin(record);
+            } else {
+                buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext());
+            }
+            buffer.get().evictWhile(() -> buffer.get().minTimestamp() <= 
deadline, this::emit);

Review Comment:
   it should be the same. So sure I can go with the simple approach 



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new 
KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   I'm going to move the relevant tests to the integration tests and remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to