Hi all! Is it possible to apply assertions against the underlying state stores within a KeyedProcessFunction using the existing KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I wanted to ensure that if I passed in two elements each with unique keys that I would be able to query the underlying state stores to ensure they were working as expected. I don’t really see a mechanism that would support such behavior (i.e. give me the state store for key n from the operator?)
@Test fun `Verify that instances with different keys retain separate watermarks`() { // Arrange val logs = listOf( StreamRecord(TestGenerator.generateLog(tenant = "A")), StreamRecord(TestGenerator.generateLog(tenant = "B")), ) // Act magicWindowHarness .processElements(logs) // Assert (I'd like to access the state by key for each here) assert(magicWindowHarness.getStateForKey("A"), ...) assert(magicWindowHarness.getStateForKey("B"), ...) } Is something like this possible or is there a better way to access the underlying state store? It seemed to work as expected when only a single key was involved, but when multiple keys were involved, things seemed to fall apart. The current testing documentation [0] is fantastic, however I think this might qualify as a more advanced task than it covered. At present all of the state stores of the underlying function are privately declared, which may/may not be relevant: @Transient private lateinit var watermark: ValueState<Long> @Transient private lateinit var scheduledEvictions: MapState<Long, Long> Any recommendations or advice would be greatly appreciated and I'll be happy to provide any additional context/details as needed. Thanks a lot! Rion [0]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html