I do not believe this to be possible.

Given that the state will likely in some form affect the behavior of the function (usually in regards to what it outputs), it may be a better idea to test for that. (I suppose you'd want tests like that anyway)

On 3/3/2021 8:10 PM, Rion Williams wrote:
Hi all!

Is it possible to apply assertions against the underlying state stores within a KeyedProcessFunction using the existing KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I wanted to ensure that if I passed in two elements each with unique keys that I would be able to query the underlying state stores to ensure they were working as expected. I don’t really see a mechanism that would support such behavior (i.e. give me the state store for key n from the operator?)

@Test
fun `Verify that instances with different keys retain separate watermarks`() {
    // Arrange
    val logs = listOf(
        StreamRecord(TestGenerator.generateLog(tenant = "A")),
        StreamRecord(TestGenerator.generateLog(tenant = "B")),
    )

    // Act
    magicWindowHarness
        .processElements(logs)

    // Assert (I'd like to access the state by key for each here)
    assert(magicWindowHarness.getStateForKey("A"), ...)
    assert(magicWindowHarness.getStateForKey("B"), ...)
}

Is something like this possible or is there a better way to access the underlying state store? It seemed to work as expected when only a single key was involved, but when multiple keys were involved, things seemed to fall apart. The current testing documentation [0] is fantastic, however I think this might qualify as a more advanced task than it covered.

At present all of the state stores of the underlying function are privately declared, which may/may not be relevant:

@Transient private lateinit var watermark: ValueState<Long>
@Transient private lateinit var scheduledEvictions: MapState<Long, Long>

Any recommendations or advice would be greatly appreciated and I'll be happy to provide any additional context/details as needed.

Thanks a lot!

Rion

[0]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html>


Reply via email to