mjsax commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1134781624
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java: ########## @@ -48,7 +49,9 @@ final class StateManagerUtil { private StateManagerUtil() {} static RecordConverter converterForStore(final StateStore store) { - return isTimestamped(store) ? rawValueToTimestampedValue() : identity(); + // should not prepend timestamp when restoring records for versioned store, as + // timestamp is used separately during put() process for restore of versioned stores + return (isTimestamped(store) && !isVersioned(store)) ? rawValueToTimestampedValue() : identity(); Review Comment: Nice one! ########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -334,18 +399,65 @@ private final int produceSourceData(final long timestamp, } /** - * Test-only processor for inserting records into a versioned store while also tracking - * them separately in-memory, and performing checks to validate expected store contents. - * Forwards the number of failed checks downstream for consumption. + * @param topic topic to produce to + * @param dataTracker map of key -> timestamp -> value for tracking data which is produced to + * the topic. This method will add the produced data into this in-memory + * tracker in addition to producing to the topic, in order to keep the two + * in sync. + * @param timestamp timestamp to produce with + * @param keyValues key-value pairs to produce + * + * @return number of records produced + */ + @SuppressWarnings("varargs") + @SafeVarargs + private final int produceDataToTopic(final String topic, + final DataTracker dataTracker, + final long timestamp, + final KeyValue<Integer, String>... keyValues) { + produceDataToTopic(topic, timestamp, keyValues); + + for (final KeyValue<Integer, String> keyValue : keyValues) { + dataTracker.add(keyValue.key, timestamp, keyValue.value); + } + + return keyValues.length; + } + + /** + * Test-only processor for validating expected contents of a versioned store, and forwards + * the number of failed checks downstream for consumption. Callers specify whether the + * processor should also be responsible for inserting records into the store (while also + * tracking them separately in-memory for use in validation). */ private static class VersionedStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> { private ProcessorContext<Integer, Integer> context; private VersionedKeyValueStore<Integer, String> store; + // whether or not the processor should write records to the store as they arrive. + // must be false for global stores. Review Comment: Why must it be false for global stores? ########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -302,7 +319,54 @@ public void shouldAllowCustomIQv2ForCustomStoreImplementations() { .withPartitions(Collections.singleton(0)); final StateQueryResult<String> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); - assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); + assertThat(result.getOnlyPartitionResult().getResult(), equalTo("success")); + } + + @Test + public void shouldCreateGlobalTable() throws Exception { + // produce data to global store topic and track in-memory for processor to verify + final DataTracker data = new DataTracker(); + produceDataToTopic(globalTableTopic, data, baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .globalTable( + globalTableTopic, + Consumed.with(Serdes.Integer(), Serdes.String()), + Materialized + .<Integer, String>as(new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION)) + .withKeySerde(Serdes.Integer()) + .withValueSerde(Serdes.String()) + ); + streamsBuilder + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new VersionedStoreContentCheckerProcessor(false, data)) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data to trigger store verifications in processor + int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp + 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(receivedRecord.value, equalTo(0)); Review Comment: Seems we only verify if the regular `stream().process().to()` did not produce any errors. But we don't verify the global store at all? In your original comment, you say we cannot get the data from global-ktable because we cannot inject a `Processor` -- well, we could use `addGlobalStore` instead of `globalTable` to add a `Processor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org