I agree with you. It's quite useful to access the ExecutionConfig in Source API. When I develop the flink-connector-pulsar. The only configuration that I can't access is the checkpoint configure which is defined in ExecutionConfig. I can switch the behavior automatically by the checkpoint switch. So I have to add more custom configurations for the Pulsar Source.
On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee <[email protected]> wrote: > > Hello, > > I'm trying to develop Flink connectors to NATS using the new FLIP-27 and > FLIP-143 APIs. The scaffolding is more complicated than the old > SourceFunction and SinkFunction, but not terrible. However I can't figure out > how to access the ExecutionConfig under these new APIs. This was possible in > the old APIs by way of the RuntimeContext of the AbstractRichFunction (which > are extended by RichSourceFunction and RichSinkFunction). > > The reason I would like this is: some interactions with external systems may > be invalid under certain Flink job execution parameters. Consider a system > like NATS which allows for acknowledgements of messages received. I would > ideally acknowledge all received messages by the source connector during > checkpointing. If I fail to acknowledge the delivered messages, after a > pre-configured amount of time, NATS would resend the message (which is good > in my case for fault tolerance). > > However, if a Flink job using these connectors has disabled checkpointing or > made the interval too large, the connector will never acknowledge delivered > messages and the NATS system may send the message again and cause duplicate > data. I would be able to avoid this if I could access the ExecutionConfig to > check these parameters and throw early. > > I know that the SourceReaderContext gives me access to the Configuration, but > that doesn't handle the case where the execution-environment is set > programatically in a job definition rather than through configuration. Any > ideas? > > Thanks, > Chris
