I agree with you. It's quite useful to access the ExecutionConfig in
Source API. When I develop the flink-connector-pulsar. The only
configuration that I can't access is the checkpoint configure which is
defined in ExecutionConfig. I can switch the behavior automatically by
the checkpoint switch. So I have to add more custom configurations for
the Pulsar Source.

On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee <[email protected]> wrote:
>
> Hello,
>
> I'm trying to develop Flink connectors to NATS using the new FLIP-27 and 
> FLIP-143 APIs. The scaffolding is more complicated than the old 
> SourceFunction and SinkFunction, but not terrible. However I can't figure out 
> how to access the ExecutionConfig under these new APIs. This was possible in 
> the old APIs by way of the RuntimeContext of the AbstractRichFunction (which 
> are extended by RichSourceFunction and RichSinkFunction).
>
> The reason I would like this is:  some interactions with external systems may 
> be invalid under certain Flink job execution parameters. Consider a system 
> like NATS which allows for acknowledgements of messages received. I would 
> ideally acknowledge all received messages by the source connector during 
> checkpointing. If I fail to acknowledge the delivered messages, after a 
> pre-configured amount of time, NATS would resend the message (which is good 
> in my case for fault tolerance).
>
> However, if a Flink job using these connectors has disabled checkpointing or 
> made the interval too large, the connector will never acknowledge delivered 
> messages and the NATS system may send the message again and cause duplicate 
> data. I would be able to avoid this if I could access the ExecutionConfig to 
> check these parameters and throw early.
>
> I know that the SourceReaderContext gives me access to the Configuration, but 
> that doesn't handle the case where the execution-environment is set 
> programatically in a job definition rather than through configuration. Any 
> ideas?
>
> Thanks,
> Chris

Reply via email to