Yes I did, thanks for sending it back :) Copying my previous reply for the ML:
Hey Thomas, > > You are correct that there is no way to inject dynamic information into > the TypeSerializer configured from the TypeSerializerSnapshot, but that > should not be a problem for your use case. > > The type serializer instantiated from a TypeSerializerSnapshot is only > used to perform schema migrations. Assuming the schema registry enforces > all changes are backwards compatible, your snapshot instance can always > return CompatibleAsIs and its serializer will never be used. > > The tradeoff here is that when the schema does change, Flink will not > eagerly migrate all values in state but instead lazily migrate as state > values are updated. > > Seth > Currently the TypeSerializerSnapshot logic is completely deterministic, and my intuition is that we should not change that. Please let us know if what I described does not work in practice and we can take it from there. Seth On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise <ar...@apache.org> wrote: > Hi Thomas, > > Could you add a sketch of your preferred solution? From what I gathered, > you have all the information available in your main (probably misunderstood > that), so what's keeping you from adding the TypeSerializer as a field to > your UDF? > > On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> wrote: > >> Hi, >> In my past project I was able to use Spring as a DI provider for Flink >> Jobs. It actually saves me a lot of hassle while writing/composing jobs and >> process functions. >> I was able to use all Spring's Bean annotations along with properties >> files managed by Spring as it would be a "normal" spring app. The >> dependencies that I was injecting via Spring were not >> serialized/deserialized by Flink which actually was something that I wanted >> to achieved. In some cases it is very hard or maybe even impossible to make >> some 3rd party classes serializable. >> >> Things to highlight here: >> 1. I did it only for StreamAPI i think it could work also for TableAPI >> though. >> 2.I was loading a Spring context from ProcessFunction::open method. >> I was able to customize via Job parameters which Spring configuration I >> want to load. >> After doing this, all fields annotated with @Autowired were injected. >> 3, I was using standard @Configuration classes >> >> Issues: >> 1. Since i was using operator::open method to load the context, the >> context will be loaded few times depends on the number of operators >> deployed on particular Task Manager. This however could be improved. >> 2. The important thing here was that all your classes have to be >> "deployed" on every Task Manager/Job Manager in order to load them through >> DI. >> We achieved this by using what is called "Job session" cluster. Where our >> custom Flink docker image was build in a way that it contains our job jar >> with all dependencies needed. >> >> Because of that, we were not be able to use things like AWS EMR or >> Kinesis. >> >> Cheers, >> Krzysztof Chmielewski >> >> wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaĆ(a): >> >>> Hi, >>> >>> I was looking into a problem that requires a configurable type >>> serializer for communication with a schema registry. The service >>> endpoint can change, so I would not want to make it part of the >>> serializer snapshot but rather resolve it at graph construction time >>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not >>> be embedded into a checkpoint). >>> >>> TypeSerializer is instantiated via either TypeInformation or >>> TypeSerializerSnapshot. While TypeInformation provides access to >>> ExecutionConfig and therefore ability to access parameters from >>> GlobalJobParameters that could be provided through the entry point, >>> restoreSerializer requires the serializer to be constructed from the >>> snapshot state alone. >>> >>> Ideally there would be a dependency injection mechanism for user code. >>> Discussion in [1] indicated there isn't a direct solution. Has anyone >>> come across a similar use case and found a way to work around this >>> limitation? It might be possible to work with a configuration >>> singleton that initializes from a file in a well known location, but >>> that depends on the deployment environment and doesn't play nice with >>> testing. >>> >>> Thanks, >>> Thomas >>> >>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o >>> >>