Hi all!

Is it possible to apply assertions against the underlying state stores
within a KeyedProcessFunction using the existing
KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I
wanted to ensure that if I passed in two elements each with unique keys
that I would be able to query the underlying state stores to ensure they
were working as expected. I don’t really see a mechanism that would support
such behavior (i.e. give me the state store for key n from the operator?)

@Test
fun `Verify that instances with different keys retain separate
watermarks`() {
    // Arrange
    val logs = listOf(
        StreamRecord(TestGenerator.generateLog(tenant = "A")),
        StreamRecord(TestGenerator.generateLog(tenant = "B")),
    )

    // Act
    magicWindowHarness
        .processElements(logs)

    // Assert (I'd like to access the state by key for each here)
    assert(magicWindowHarness.getStateForKey("A"), ...)
    assert(magicWindowHarness.getStateForKey("B"), ...)
}

Is something like this possible or is there a better way to access the
underlying state store? It seemed to work as expected when only a single
key was involved, but when multiple keys were involved, things seemed to
fall apart. The current testing documentation [0] is fantastic, however I
think this might qualify as a more advanced task than it covered.

At present all of the state stores of the underlying function are privately
declared, which may/may not be relevant:

@Transient private lateinit var watermark: ValueState<Long>
@Transient private lateinit var scheduledEvictions: MapState<Long, Long>

Any recommendations or advice would be greatly appreciated and I'll be
happy to provide any additional context/details as needed.

Thanks a lot!

Rion

[0]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

Reply via email to