mjsax commented on code in PR #21455:
URL: https://github.com/apache/kafka/pull/21455#discussion_r2844032751


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java:
##########
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class TimestampedKeyValueStoreWithHeadersTest {
+
+    private static final String STORE_NAME = "headers-store";
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    private static final Headers HEADERS1 = new RecordHeaders()
+        .add("source", "test".getBytes())
+        .add("version", "1.0".getBytes());
+
+    private static final Headers HEADERS2 = new RecordHeaders()
+        .add("source", "test".getBytes())
+        .add("version", "2.0".getBytes());
+
+    private static final Headers EMPTY_HEADERS = new RecordHeaders();
+
+    public TestInfo testInfo;
+
+    @BeforeAll
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterAll
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @BeforeEach
+    public void beforeTest(final TestInfo testInfo) throws 
InterruptedException {
+        this.testInfo = testInfo;
+        final String uniqueTestName = safeUniqueTestName(testInfo);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+
+    }
+
+    @AfterEach
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data with headers
+        int numRecordsProduced = 0;
+
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp, HEADERS1,
+            KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
+
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 5, HEADERS2,
+            KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c5"));
+
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 2,
+            EMPTY_HEADERS,
+            KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, 
null));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertEquals(receivedRecord.value, 0);
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceDataToTopicWithHeaders(inputStream, baseTimestamp, new 
RecordHeaders(), KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME + 
"-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertEquals(changelogTopicConfig.getProperty("cleanup.policy"), 
"compact");
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data with headers
+        final Map<Integer, Optional<ValueTimestampHeaders<String>>> 
expectedData = new HashMap<>();
+        int initialRecordsProduced = 0;
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp, HEADERS1,
+            KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
+        expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a0", 
baseTimestamp, HEADERS1)));
+        expectedData.put(2, Optional.of(ValueTimestampHeaders.make("b0", 
baseTimestamp, HEADERS1)));
+        expectedData.put(3, Optional.empty());  // null value
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 5, HEADERS2,
+            KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c5"));
+        expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a5", 
baseTimestamp + 5, HEADERS2)));
+        expectedData.put(2, Optional.empty());  // null value
+        expectedData.put(3, Optional.of(ValueTimestampHeaders.make("c5", 
baseTimestamp + 5, HEADERS2)));
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 10, EMPTY_HEADERS,
+            KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, 
"c10"));
+        expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a10", 
baseTimestamp + 10, EMPTY_HEADERS)));
+        expectedData.put(2, Optional.of(ValueTimestampHeaders.make("b10", 
baseTimestamp + 10, EMPTY_HEADERS)));
+        expectedData.put(3, Optional.of(ValueTimestampHeaders.make("c10", 
baseTimestamp + 10, EMPTY_HEADERS)));
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app - use processor WITHOUT validation of initial data, 
just write to store
+        streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records to verify restored store works correctly
+        final Headers finalHeaders = new RecordHeaders().add("final", 
"true".getBytes());
+        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 12, finalHeaders,
+            KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertEquals(receivedRecord.value, 0);
+        }
+    }
+
+    @Test
+    public void shouldManualUpgradeFromTimestampedToHeaders() throws Exception 
{
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilder(
+                    Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(TimestampedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        shouldManualUpgradeFromTimestampedToHeaders(streamsBuilder.build());
+    }
+
+    private void shouldManualUpgradeFromTimestampedToHeaders(final Topology 
originalTopology) throws Exception {

Review Comment:
   We can leave as-is, but I guess it's a c&p artifact from "version store" 
test which has a similar helper, but the helper is used twice there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to