Hey Martijn, Thank you for the detailed feedback!
> 1. The FLIP has good examples of harness construction via the builder, but > doesn't address lifecycle management. For comparison, the existing > DataStream test harnesses (documented at [1]) have explicit open() and > implement AutoCloseable. Since PTFs can acquire resources in open(), the > harness needs to manage this lifecycle. Could you clarify how > open()/close() on the underlying PTF is handled? Is close() called > automatically, or does the user need to trigger it? An end-to-end example > showing cleanup would help. This is a good point! I must have missed the AutoClosable implementation in the public interfaces section - but I did intend to add it! So it would be closed automatically (but I suppose a user could also call close() itself). For the open, I was planning on calling it during when the harness is built using `Builder.build()`. > 2. The existing operator test harnesses support snapshot() and > initializeState(OperatorSubtaskState) to simulate checkpoint/restore > cycles. This is important for catching state serialization bugs, which are > a common source of production issues. The FLIP provides withInitialState() > for setup, but there's no way to take a snapshot mid-test and restore into > a fresh harness. Are we deliberately excluding this, or should we consider > adding it? I would absolutely consider adding this, thank you for pointing it out. I think being able to take a `.snapshot()` from a harness and then initialise a new harness via a `restore()` on the builder would make sense, as well as maybe supporting `.restore()` on the harness itself after it has been built. > 3. Related to the above: PTFs with complex state (Map, List, POJO) can > behave differently with heap vs. RocksDB backends due to serialization > differences. The existing harnesses support setStateBackend(). Should the > PTF test harness support this as well? At minimum, it would be good to > document which backend is used by default. I had just intended to support heap backend and document this, but this is a good point - supporting `setStateBackend()` makes sense here, similar to the existing harnesses. I'll add this to the spec and document the default. > 4. withTableArgument(String tableName, List<Row> rows) is useful for > testing join-like PTFs. The builder Javadoc describes when static rows are > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect > delivery, but a few things remain unclear: How is the schema for these rows > determined: is it inferred from the Row structure, or does it need to match > the eval() signature's type hints? And what happens if a PTF reads from a > table argument that hasn't been configured via the builder: does it receive > null, or does the harness throw at build time? I wasn't quite sure what the right approach is here, I thought that inferring it from the Row structure would work but it feels odd to ignore the eval type hints. Perhaps I can try the Row structure approach, and it feels unergonomic explore the second approach. For a PTF that reads from a table argument that hasnt been configured, I think it would return null, yes. > > - Timer.hasFired() is typed as boolean (primitive) but annotated @Nullable. > This looks like a bug -> should it be Boolean (boxed)? Oops, good catch. I'm not sure why I marked this as nullable, either a timer has fired or it hasn't, im not sure returning null makes sense. Maybe returning a non-nullable primitive is fine here. > - getOutputByKind(RowKind) implies that output preserves RowKind metadata. > Could you confirm that getOutput() also retains this? The generic <OUT> > type parameter could use more specification on what's guaranteed. I would like getOutput to somehow retain this, but I'm not quite sure how the return type could look like in this case. Perhaps `RowData`? I'm not entirely sure if we have an interface that would cleanly capture this. > - Have you considered optional changelog consistency validation (e.g., > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? Could be a > useful debugging aid. I hadn't, no, but this is a useful idea. Could be togglable on the builder with a `.withChangelogValidation` method. > - What's the error model when eval() or a timer callback throws? Propagated > directly, or wrapped? I would say propagated directly, unless you think wrapping them could be useful here. > - The test plan mentions leveraging ProcessTableFunctionTestPrograms. Could > you clarify whether the harness will be validated against those scenarios, > or whether it's intended to replace them for certain use cases? I think just validated against them, as a way of making sure that the harness covers the right set of features we want to capture. I don't think it would replace them in this case. Thank you a ton for the feedback and ideas! I will update the FLIP documentation based on them, it's very much appreciated. Kind regards, Mika On Wed, 11 Mar 2026, at 6:02 PM, Martijn Visser wrote: > Hey Mika, > > Thanks for putting this FLIP together. A dedicated test harness for PTFs is > a welcome addition. The builder-pattern API and the state/timer > introspection features are well thought out. > > I have a few questions and suggestions after reviewing the FLIP: > > 1. The FLIP has good examples of harness construction via the builder, but > doesn't address lifecycle management. For comparison, the existing > DataStream test harnesses (documented at [1]) have explicit open() and > implement AutoCloseable. Since PTFs can acquire resources in open(), the > harness needs to manage this lifecycle. Could you clarify how > open()/close() on the underlying PTF is handled? Is close() called > automatically, or does the user need to trigger it? An end-to-end example > showing cleanup would help. > > 2. The existing operator test harnesses support snapshot() and > initializeState(OperatorSubtaskState) to simulate checkpoint/restore > cycles. This is important for catching state serialization bugs, which are > a common source of production issues. The FLIP provides withInitialState() > for setup, but there's no way to take a snapshot mid-test and restore into > a fresh harness. Are we deliberately excluding this, or should we consider > adding it? > > 3. Related to the above: PTFs with complex state (Map, List, POJO) can > behave differently with heap vs. RocksDB backends due to serialization > differences. The existing harnesses support setStateBackend(). Should the > PTF test harness support this as well? At minimum, it would be good to > document which backend is used by default. > > 4. withTableArgument(String tableName, List<Row> rows) is useful for > testing join-like PTFs. The builder Javadoc describes when static rows are > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect > delivery, but a few things remain unclear: How is the schema for these rows > determined: is it inferred from the Row structure, or does it need to match > the eval() signature's type hints? And what happens if a PTF reads from a > table argument that hasn't been configured via the builder: does it receive > null, or does the harness throw at build time? > > A few smaller points: > > - Timer.hasFired() is typed as boolean (primitive) but annotated @Nullable. > This looks like a bug -> should it be Boolean (boxed)? > - getOutputByKind(RowKind) implies that output preserves RowKind metadata. > Could you confirm that getOutput() also retains this? The generic <OUT> > type parameter could use more specification on what's guaranteed. > - Have you considered optional changelog consistency validation (e.g., > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? Could be a > useful debugging aid. > - What's the error model when eval() or a timer callback throws? Propagated > directly, or wrapped? > - The test plan mentions leveraging ProcessTableFunctionTestPrograms. Could > you clarify whether the harness will be validated against those scenarios, > or whether it's intended to replace them for certain use cases? > > Overall I'm +1 on the direction. The core API design is clean and covers > the main testing needs well. Addressing the lifecycle and > checkpoint/restore gaps would bring it in line with what Flink users > already have for DataStream UDF testing. > > Thanks, > > Martijn > > [1] > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators > > On Fri, Mar 6, 2026 at 9:30 AM Mika Naylor <[email protected]> wrote: > > > Hey David, > > > > Yeah, I think in terms of scope I aim for more providing a framework for > > unit testing the behavior of custom PTFs. I'd like to include as much > > validation as possible but there might be validation steps that aren't > > possible to do without dipping into the engine side of things. > > > > I'm not entirely sure on the real/processing time considerations - my aim > > here was mostly around letting users validate timer behaviour, and timer > > registration/firing in PTFs is based on watermarks, if I read the doc > > correctly. > > > > Kind regards, > > Mika > > > > On Wed, 4 Mar 2026, at 10:38 AM, David Radley wrote: > > > Hi Mika, > > > This sounds like a good idea, in terms of scope, Is the idea that this > > is purely for unit tests or is this additionally proposed as validation / > > test harness for use when developing custom PTFs. > > > I guess this allows us to create a common set of tests that all PTFs > > need to pass using this harness. > > > > > > I would assume there are real (not event) time considerations for some > > PTFs, it would be worth mentioning how we should handle that. > > > > > > Kind regards, David. > > > > > > From: Mika Naylor <[email protected]> > > > Date: Tuesday, 3 March 2026 at 16:46 > > > To: [email protected] <[email protected]> > > > Subject: [EXTERNAL] [DISCUSS] FLIP-567: Introduce a ProcessTableFunction > > Test Harness > > > > > > Hey everyone! > > > > > > I would like to kick off a discussion on FLIP-567: Introduce a > > ProcessTableFunction Test Harness[1]. > > > > > > Currently, testing PTFs require full integration tests against a running > > Flink cluster. This FLIP would introduce a developer-friendly test harness > > for unit testing PTFs and would provide introspection to output, state, > > timers, and watermarks for assertions and behaviour validation. This would > > let developers iterate and test their PTFs without needing to run a > > fullscale integration test against a live Flink cluster. > > > > > > Would love any thoughts and feedback the community might have on this > > proposal. > > > > > > Kind regards, > > > Mika Naylor > > > > > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-567%3A+Introduce+a+ProcessTableFunction+Test+Harness > > > > > > Unless otherwise stated above: > > > > > > IBM United Kingdom Limited > > > Registered in England and Wales with number 741598 > > > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > > Winchester, Hampshire SO21 2JN > > > > > >
