mjsax commented on code in PR #21455: URL: https://github.com/apache/kafka/pull/21455#discussion_r2844072876
########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java: ########## @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.ValueTimestampHeaders; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class TimestampedKeyValueStoreWithHeadersTest { + + private static final String STORE_NAME = "headers-store"; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static final Headers HEADERS1 = new RecordHeaders() + .add("source", "test".getBytes()) + .add("version", "1.0".getBytes()); + + private static final Headers HEADERS2 = new RecordHeaders() + .add("source", "test".getBytes()) + .add("version", "2.0".getBytes()); + + private static final Headers EMPTY_HEADERS = new RecordHeaders(); + + public TestInfo testInfo; + + @BeforeAll + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterAll + public static void after() { + CLUSTER.stop(); + } + + @BeforeEach + public void beforeTest(final TestInfo testInfo) throws InterruptedException { + this.testInfo = testInfo; + final String uniqueTestName = safeUniqueTestName(testInfo); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + + } + + @AfterEach + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutGetAndDelete() throws Exception { + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data with headers + int numRecordsProduced = 0; + + numRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS1, + KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + + numRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp + 5, HEADERS2, + KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + + numRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2, + EMPTY_HEADERS, + KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertEquals(receivedRecord.value, 0); + } + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new TimestampedStoreWithHeadersContentCheckerProcessor(false), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce record (and wait for result) to create changelog + produceDataToTopicWithHeaders(inputStream, baseTimestamp, new RecordHeaders(), KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME + "-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertEquals(changelogTopicConfig.getProperty("cleanup.policy"), "compact"); + } + + @Test + public void shouldRestore() throws Exception { + StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data with headers + final Map<Integer, Optional<ValueTimestampHeaders<String>>> expectedData = new HashMap<>(); + int initialRecordsProduced = 0; + + initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS1, + KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a0", baseTimestamp, HEADERS1))); + expectedData.put(2, Optional.of(ValueTimestampHeaders.make("b0", baseTimestamp, HEADERS1))); + expectedData.put(3, Optional.empty()); // null value + + initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp + 5, HEADERS2, + KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a5", baseTimestamp + 5, HEADERS2))); + expectedData.put(2, Optional.empty()); // null value + expectedData.put(3, Optional.of(ValueTimestampHeaders.make("c5", baseTimestamp + 5, HEADERS2))); + + initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp + 10, EMPTY_HEADERS, + KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); + expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a10", baseTimestamp + 10, EMPTY_HEADERS))); + expectedData.put(2, Optional.of(ValueTimestampHeaders.make("b10", baseTimestamp + 10, EMPTY_HEADERS))); + expectedData.put(3, Optional.of(ValueTimestampHeaders.make("c10", baseTimestamp + 10, EMPTY_HEADERS))); + + // wait for output + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app - use processor WITHOUT validation of initial data, just write to store + streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME) Review Comment: I missed this on the last review, but don't we need to pass in `DataTracker` (or at least some `HashMap` object here, with expected store content _after_ restoration? (cf the corresponding test for "versioned state store", which I believe was used a blue print for this test.) Given that this test is not failing right now, it seems something is off with the test setup: -- I think the problem is, that we only use three record with keys `1,2,3`, and after restoration we update all three keys, overwriting everything we got from restore, so even if restore would have failed, we would not notice. I think, after restoration, we should pipe records with different keys (and maybe use `pufIfAbsent` to also verify that no key exist already, which would be incorrect), to ensure restoration did work correctly? (For the versioned-store test, we could use the same keys because everything is immutable in side the store, base on ts -- and after restoration we use different timestamps, which are effective inserts into the store -- but with header stores, we get different behavior.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
