I do not believe this to be possible.
Given that the state will likely in some form affect the behavior of the
function (usually in regards to what it outputs), it may be a better
idea to test for that. (I suppose you'd want tests like that anyway)
On 3/3/2021 8:10 PM, Rion Williams wrote:
Hi all!
Is it possible to apply assertions against the underlying state stores
within a KeyedProcessFunction using the existing
KeyedOneInputStreamOperatorTestHarness class within unit tests?
Basically I wanted to ensure that if I passed in two elements each
with unique keys that I would be able to query the underlying state
stores to ensure they were working as expected. I don’t really see a
mechanism that would support such behavior (i.e. give me the state
store for key n from the operator?)
@Test
fun `Verify that instances with different keys retain separate
watermarks`() {
// Arrange
val logs = listOf(
StreamRecord(TestGenerator.generateLog(tenant = "A")),
StreamRecord(TestGenerator.generateLog(tenant = "B")),
)
// Act
magicWindowHarness
.processElements(logs)
// Assert (I'd like to access the state by key for each here)
assert(magicWindowHarness.getStateForKey("A"), ...)
assert(magicWindowHarness.getStateForKey("B"), ...)
}
Is something like this possible or is there a better way to access the
underlying state store? It seemed to work as expected when only a
single key was involved, but when multiple keys were involved, things
seemed to fall apart. The current testing documentation [0] is
fantastic, however I think this might qualify as a more advanced task
than it covered.
At present all of the state stores of the underlying function are
privately declared, which may/may not be relevant:
@Transient private lateinit var watermark: ValueState<Long>
@Transient private lateinit var scheduledEvictions: MapState<Long, Long>
Any recommendations or advice would be greatly appreciated and I'll be
happy to provide any additional context/details as needed.
Thanks a lot!
Rion
[0]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html>