Hello, I'm trying to develop Flink connectors to NATS using the new FLIP-27 and FLIP-143 APIs. The scaffolding is more complicated than the old SourceFunction and SinkFunction, but not terrible. However I can't figure out how to access the ExecutionConfig under these new APIs. This was possible in the old APIs by way of the RuntimeContext of the AbstractRichFunction (which are extended by RichSourceFunction and RichSinkFunction).
The reason I would like this is: some interactions with external systems may be invalid under certain Flink job execution parameters. Consider a system like NATS which allows for acknowledgements of messages received. I would ideally acknowledge all received messages by the source connector during checkpointing. If I fail to acknowledge the delivered messages, after a pre-configured amount of time, NATS would resend the message (which is good in my case for fault tolerance). However, if a Flink job using these connectors has disabled checkpointing or made the interval too large, the connector will never acknowledge delivered messages and the NATS system may send the message again and cause duplicate data. I would be able to avoid this if I could access the ExecutionConfig to check these parameters and throw early. I know that the SourceReaderContext gives me access to the Configuration, but that doesn't handle the case where the execution-environment is set programatically in a job definition rather than through configuration. Any ideas? Thanks, Chris