Hey Martijn,

Thank you for the detailed feedback!

> 1. The FLIP has good examples of harness construction via the builder, but
> doesn't address lifecycle management. For comparison, the existing
> DataStream test harnesses (documented at [1]) have explicit open() and
> implement AutoCloseable. Since PTFs can acquire resources in open(), the
> harness needs to manage this lifecycle. Could you clarify how
> open()/close() on the underlying PTF is handled? Is close() called
> automatically, or does the user need to trigger it? An end-to-end example
> showing cleanup would help.

This is a good point! I must have missed the AutoClosable implementation in the 
public interfaces section - but I did intend to add it! So it would be closed 
automatically (but I suppose a user could also call close() itself). For the 
open, I was planning on calling it during when the harness is built using 
`Builder.build()`.

> 2. The existing operator test harnesses support snapshot() and
> initializeState(OperatorSubtaskState) to simulate checkpoint/restore
> cycles. This is important for catching state serialization bugs, which are
> a common source of production issues. The FLIP provides withInitialState()
> for setup, but there's no way to take a snapshot mid-test and restore into
> a fresh harness. Are we deliberately excluding this, or should we consider
> adding it?

I would absolutely consider adding this, thank you for pointing it out. I think 
being able to take a `.snapshot()` from a harness and then initialise a new 
harness via a `restore()` on the builder would make sense, as well as maybe 
supporting `.restore()` on the harness itself after it has been built.

> 3. Related to the above: PTFs with complex state (Map, List, POJO) can
> behave differently with heap vs. RocksDB backends due to serialization
> differences. The existing harnesses support setStateBackend(). Should the
> PTF test harness support this as well? At minimum, it would be good to
> document which backend is used by default.

I had just intended to support heap backend and document this, but this is a 
good point - supporting `setStateBackend()` makes sense here, similar to the 
existing harnesses. I'll add this to the spec and document the default.

> 4. withTableArgument(String tableName, List<Row> rows) is useful for
> testing join-like PTFs. The builder Javadoc describes when static rows are
> passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect
> delivery, but a few things remain unclear: How is the schema for these rows
> determined: is it inferred from the Row structure, or does it need to match
> the eval() signature's type hints? And what happens if a PTF reads from a
> table argument that hasn't been configured via the builder: does it receive
> null, or does the harness throw at build time?

I wasn't quite sure what the right approach is here, I thought that inferring 
it from the Row structure would work but it feels odd to ignore the eval type 
hints. Perhaps I can try the Row structure approach, and it feels unergonomic 
explore the second approach.

For a PTF that reads from a table argument that hasnt been configured, I think 
it would return null, yes.

> 
> - Timer.hasFired() is typed as boolean (primitive) but annotated @Nullable.
> This looks like a bug -> should it be Boolean (boxed)?

Oops, good catch. I'm not sure why I marked this as nullable, either a timer 
has fired or it hasn't, im not sure returning null makes sense. Maybe returning 
a non-nullable primitive is fine here.

> - getOutputByKind(RowKind) implies that output preserves RowKind metadata.
> Could you confirm that getOutput() also retains this? The generic <OUT>
> type parameter could use more specification on what's guaranteed.

I would like getOutput to somehow retain this, but I'm not quite sure how the 
return type could look like in this case. Perhaps `RowData`? I'm not entirely 
sure if we have an interface that would cleanly capture this.

> - Have you considered optional changelog consistency validation (e.g.,
> verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? Could be a
> useful debugging aid.

I hadn't, no, but this is a useful idea. Could be togglable on the builder with 
a `.withChangelogValidation` method.

> - What's the error model when eval() or a timer callback throws? Propagated
> directly, or wrapped?

I would say propagated directly, unless you think wrapping them could be useful 
here.

> - The test plan mentions leveraging ProcessTableFunctionTestPrograms. Could
> you clarify whether the harness will be validated against those scenarios,
> or whether it's intended to replace them for certain use cases?

I think just validated against them, as a way of making sure that the harness 
covers the right set of features we want to capture. I don't think it would 
replace them in this case.

Thank you a ton for the feedback and ideas! I will update the FLIP 
documentation based on them, it's very much appreciated.

Kind regards,
Mika

On Wed, 11 Mar 2026, at 6:02 PM, Martijn Visser wrote:
> Hey Mika,
> 
> Thanks for putting this FLIP together. A dedicated test harness for PTFs is
> a welcome addition. The builder-pattern API and the state/timer
> introspection features are well thought out.
> 
> I have a few questions and suggestions after reviewing the FLIP:
> 
> 1. The FLIP has good examples of harness construction via the builder, but
> doesn't address lifecycle management. For comparison, the existing
> DataStream test harnesses (documented at [1]) have explicit open() and
> implement AutoCloseable. Since PTFs can acquire resources in open(), the
> harness needs to manage this lifecycle. Could you clarify how
> open()/close() on the underlying PTF is handled? Is close() called
> automatically, or does the user need to trigger it? An end-to-end example
> showing cleanup would help.
> 
> 2. The existing operator test harnesses support snapshot() and
> initializeState(OperatorSubtaskState) to simulate checkpoint/restore
> cycles. This is important for catching state serialization bugs, which are
> a common source of production issues. The FLIP provides withInitialState()
> for setup, but there's no way to take a snapshot mid-test and restore into
> a fresh harness. Are we deliberately excluding this, or should we consider
> adding it?
> 
> 3. Related to the above: PTFs with complex state (Map, List, POJO) can
> behave differently with heap vs. RocksDB backends due to serialization
> differences. The existing harnesses support setStateBackend(). Should the
> PTF test harness support this as well? At minimum, it would be good to
> document which backend is used by default.
> 
> 4. withTableArgument(String tableName, List<Row> rows) is useful for
> testing join-like PTFs. The builder Javadoc describes when static rows are
> passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect
> delivery, but a few things remain unclear: How is the schema for these rows
> determined: is it inferred from the Row structure, or does it need to match
> the eval() signature's type hints? And what happens if a PTF reads from a
> table argument that hasn't been configured via the builder: does it receive
> null, or does the harness throw at build time?
> 
> A few smaller points:
> 
> - Timer.hasFired() is typed as boolean (primitive) but annotated @Nullable.
> This looks like a bug -> should it be Boolean (boxed)?
> - getOutputByKind(RowKind) implies that output preserves RowKind metadata.
> Could you confirm that getOutput() also retains this? The generic <OUT>
> type parameter could use more specification on what's guaranteed.
> - Have you considered optional changelog consistency validation (e.g.,
> verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? Could be a
> useful debugging aid.
> - What's the error model when eval() or a timer callback throws? Propagated
> directly, or wrapped?
> - The test plan mentions leveraging ProcessTableFunctionTestPrograms. Could
> you clarify whether the harness will be validated against those scenarios,
> or whether it's intended to replace them for certain use cases?
> 
> Overall I'm +1 on the direction. The core API design is clean and covers
> the main testing needs well. Addressing the lifecycle and
> checkpoint/restore gaps would bring it in line with what Flink users
> already have for DataStream UDF testing.
> 
> Thanks,
> 
> Martijn
> 
> [1]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
> 
> On Fri, Mar 6, 2026 at 9:30 AM Mika Naylor <[email protected]> wrote:
> 
> > Hey David,
> >
> > Yeah, I think in terms of scope I aim for more providing a framework for
> > unit testing the behavior of custom PTFs. I'd like to include as much
> > validation as possible but there might be validation steps that aren't
> > possible to do without dipping into the engine side of things.
> >
> > I'm not entirely sure on the real/processing time considerations - my aim
> > here was mostly around letting users validate timer behaviour, and timer
> > registration/firing in PTFs is based on watermarks, if I read the doc
> > correctly.
> >
> > Kind regards,
> > Mika
> >
> > On Wed, 4 Mar 2026, at 10:38 AM, David Radley wrote:
> > > Hi Mika,
> > > This sounds like a good idea, in terms of scope, Is the idea that this
> > is purely for unit tests or is this additionally proposed as validation /
> > test harness for use when developing custom PTFs.
> > > I guess this allows us to create a common set of tests that all PTFs
> > need to pass using this harness.
> > >
> > > I would assume there are real (not event)  time considerations for some
> > PTFs, it would be worth mentioning how we should handle that.
> > >
> > >    Kind regards,  David.
> > >
> > > From: Mika Naylor <[email protected]>
> > > Date: Tuesday, 3 March 2026 at 16:46
> > > To: [email protected] <[email protected]>
> > > Subject: [EXTERNAL] [DISCUSS] FLIP-567: Introduce a ProcessTableFunction
> > Test Harness
> > >
> > > Hey everyone!
> > >
> > > I would like to kick off a discussion on FLIP-567: Introduce a
> > ProcessTableFunction Test Harness[1].
> > >
> > > Currently, testing PTFs require full integration tests against a running
> > Flink cluster. This FLIP would introduce a developer-friendly test harness
> > for unit testing PTFs and would provide introspection to output, state,
> > timers, and watermarks for assertions and behaviour validation. This would
> > let developers iterate and test their PTFs without needing to run a
> > fullscale integration test against a live Flink cluster.
> > >
> > > Would love any thoughts and feedback the community might have on this
> > proposal.
> > >
> > > Kind regards,
> > > Mika Naylor
> > >
> > > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-567%3A+Introduce+a+ProcessTableFunction+Test+Harness
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> > Winchester, Hampshire SO21 2JN
> > >
> >
> 

Reply via email to