Thanks Timo. The whole idea is also based on the side output and output tag. Let me explain it in detail:
1. Introduce a VirtualTableScan(or SideOutputTableScan), which can be optimized as Physical RelNode. Then we can create a source catalog table which will be converted to a VirtualTableScan, and user can query from it like normal. 2. Introduce a WithSideOutput interface, which announce that it can expose one or more side outputs with specific output tags. E.g. StreamExecGroupWindowAggregate can implement it and expose a tag for late messages. 3. After optimized, we get some nodes. We can split them to the normal part and those with virtual scan node. In the normal part, some nodes can expose some side outputs. For each output tag, we can derive a SideOutputExecNode with the node as input. Then check whether one SideOutputExecNode is accepted by any virtual scan nodes. 4. As we may not know the side output type of the WithSideOutput in advance (or maybe more than one WithSideOutputs), we may provide the encoded data from the VirtualScan. If needed, the Mapper(Encoding) function will be provided by the virtual scan node. Then we can expand(replace) the virtual scan node with some pairs of SideOutputExecNode and Mapper Node. 5. Now, we get a completed ExecNode DAG, and can translates it to a Transformation DAG. The virtual source catalog table can be built-in if user enable this feature. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/