Thanks for the feedback! @Seth
Your suggestion should work, I yet have to try it out. However relying on undocumented behavior (return CompatibleAsIs and its serializer will never be used) would make me hesitant to adopt it as permanent solution. @Arvid There is no issue with constructing the deserializer via main and TypeInfo. Although a more explicit way to inject dependencies would be desirable, along the lines of the open method elsewhere. The bigger limitation I run into is the restore from snapshot behavior, which requires to construct the TypeSerializer with the no arg constructor and no alternative way to pass a dependency. Thanks, Thomas On 2021/11/10 13:23:21 Seth Wiesman wrote: > Yes I did, thanks for sending it back :) Copying my previous reply for the > ML: > > Hey Thomas, > > > > You are correct that there is no way to inject dynamic information into > > the TypeSerializer configured from the TypeSerializerSnapshot, but that > > should not be a problem for your use case. > > > > The type serializer instantiated from a TypeSerializerSnapshot is only > > used to perform schema migrations. Assuming the schema registry enforces > > all changes are backwards compatible, your snapshot instance can always > > return CompatibleAsIs and its serializer will never be used. > > > > The tradeoff here is that when the schema does change, Flink will not > > eagerly migrate all values in state but instead lazily migrate as state > > values are updated. > > > > Seth > > > > Currently the TypeSerializerSnapshot logic is completely deterministic, and > my intuition is that we should not change that. Please let us know if what > I described does not work in practice and we can take it from there. > > Seth > > On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise <ar...@apache.org> wrote: > > > Hi Thomas, > > > > Could you add a sketch of your preferred solution? From what I gathered, > > you have all the information available in your main (probably misunderstood > > that), so what's keeping you from adding the TypeSerializer as a field to > > your UDF? > > > > On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski < > > krzysiek.chmielew...@gmail.com> wrote: > > > >> Hi, > >> In my past project I was able to use Spring as a DI provider for Flink > >> Jobs. It actually saves me a lot of hassle while writing/composing jobs and > >> process functions. > >> I was able to use all Spring's Bean annotations along with properties > >> files managed by Spring as it would be a "normal" spring app. The > >> dependencies that I was injecting via Spring were not > >> serialized/deserialized by Flink which actually was something that I wanted > >> to achieved. In some cases it is very hard or maybe even impossible to make > >> some 3rd party classes serializable. > >> > >> Things to highlight here: > >> 1. I did it only for StreamAPI i think it could work also for TableAPI > >> though. > >> 2.I was loading a Spring context from ProcessFunction::open method. > >> I was able to customize via Job parameters which Spring configuration I > >> want to load. > >> After doing this, all fields annotated with @Autowired were injected. > >> 3, I was using standard @Configuration classes > >> > >> Issues: > >> 1. Since i was using operator::open method to load the context, the > >> context will be loaded few times depends on the number of operators > >> deployed on particular Task Manager. This however could be improved. > >> 2. The important thing here was that all your classes have to be > >> "deployed" on every Task Manager/Job Manager in order to load them through > >> DI. > >> We achieved this by using what is called "Job session" cluster. Where our > >> custom Flink docker image was build in a way that it contains our job jar > >> with all dependencies needed. > >> > >> Because of that, we were not be able to use things like AWS EMR or > >> Kinesis. > >> > >> Cheers, > >> Krzysztof Chmielewski > >> > >> wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaĆ(a): > >> > >>> Hi, > >>> > >>> I was looking into a problem that requires a configurable type > >>> serializer for communication with a schema registry. The service > >>> endpoint can change, so I would not want to make it part of the > >>> serializer snapshot but rather resolve it at graph construction time > >>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not > >>> be embedded into a checkpoint). > >>> > >>> TypeSerializer is instantiated via either TypeInformation or > >>> TypeSerializerSnapshot. While TypeInformation provides access to > >>> ExecutionConfig and therefore ability to access parameters from > >>> GlobalJobParameters that could be provided through the entry point, > >>> restoreSerializer requires the serializer to be constructed from the > >>> snapshot state alone. > >>> > >>> Ideally there would be a dependency injection mechanism for user code. > >>> Discussion in [1] indicated there isn't a direct solution. Has anyone > >>> come across a similar use case and found a way to work around this > >>> limitation? It might be possible to work with a configuration > >>> singleton that initializes from a file in a well known location, but > >>> that depends on the deployment environment and doesn't play nice with > >>> testing. > >>> > >>> Thanks, > >>> Thomas > >>> > >>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o > >>> > >> >