Thanks for the feedback!

@Seth

Your suggestion should work, I yet have to try it out. However relying on 
undocumented behavior (return CompatibleAsIs and its serializer will never be 
used) would make me hesitant to adopt it as permanent solution.

@Arvid

There is no issue with constructing the deserializer via main and TypeInfo. 
Although a more explicit way to inject dependencies would be desirable, along 
the lines of the open method elsewhere. The bigger limitation I run into is the 
restore from snapshot behavior, which requires to construct the TypeSerializer 
with the no arg constructor and no alternative way to pass a dependency. 

Thanks,
Thomas


On 2021/11/10 13:23:21 Seth Wiesman wrote:
> Yes I did, thanks for sending it back :) Copying my previous reply for the
> ML:
> 
> Hey Thomas,
> >
> > You are correct that there is no way to inject dynamic information into
> > the TypeSerializer configured from the TypeSerializerSnapshot, but that
> > should not be a problem for your use case.
> >
> > The type serializer instantiated from a TypeSerializerSnapshot is only
> > used to perform schema migrations. Assuming the schema registry enforces
> > all changes are backwards compatible, your snapshot instance can always
> > return CompatibleAsIs and its serializer will never be used.
> >
> > The tradeoff here is that when the schema does change, Flink will not
> > eagerly migrate all values in state but instead lazily migrate as state
> > values are updated.
> >
> > Seth
> >
> 
> Currently the TypeSerializerSnapshot logic is completely deterministic, and
> my intuition is that we should not change that. Please let us know if what
> I described does not work in practice and we can take it from there.
> 
> Seth
> 
> On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise <ar...@apache.org> wrote:
> 
> > Hi Thomas,
> >
> > Could you add a sketch of your preferred solution? From what I gathered,
> > you have all the information available in your main (probably misunderstood
> > that), so what's keeping you from adding the TypeSerializer as a field to
> > your UDF?
> >
> > On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
> > krzysiek.chmielew...@gmail.com> wrote:
> >
> >> Hi,
> >> In my past project I was able to use Spring as a DI provider for Flink
> >> Jobs. It actually saves me a lot of hassle while writing/composing jobs and
> >> process functions.
> >> I was able to use all Spring's Bean annotations along with properties
> >> files managed by Spring as it would be a "normal" spring app. The
> >> dependencies that I was injecting via Spring were not
> >> serialized/deserialized by Flink which actually was something that I wanted
> >> to achieved. In some cases it is very hard or maybe even impossible to make
> >> some 3rd party classes serializable.
> >>
> >> Things to highlight here:
> >> 1. I did it only for StreamAPI i think it could work also for TableAPI
> >> though.
> >> 2.I was loading a Spring context from ProcessFunction::open method.
> >> I was able to customize via Job parameters which Spring configuration I
> >> want to load.
> >> After doing this, all fields annotated with @Autowired were injected.
> >> 3, I was using standard @Configuration classes
> >>
> >> Issues:
> >> 1. Since i was using operator::open method to load the context, the
> >> context will be loaded few times depends on the number of operators
> >> deployed on particular Task Manager. This however could be improved.
> >> 2. The important thing here was that all your classes have to be
> >> "deployed" on every Task Manager/Job Manager in order to load them through
> >> DI.
> >> We achieved this by using what is called "Job session" cluster. Where our
> >> custom Flink docker image was build in a way that it contains our job jar
> >> with all dependencies needed.
> >>
> >> Because of that, we were not be able to use things like AWS EMR or
> >> Kinesis.
> >>
> >> Cheers,
> >> Krzysztof Chmielewski
> >>
> >> wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaƂ(a):
> >>
> >>> Hi,
> >>>
> >>> I was looking into a problem that requires a configurable type
> >>> serializer for communication with a schema registry. The service
> >>> endpoint can change, so I would not want to make it part of the
> >>> serializer snapshot but rather resolve it at graph construction time
> >>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
> >>> be embedded into a checkpoint).
> >>>
> >>> TypeSerializer is instantiated via either TypeInformation or
> >>> TypeSerializerSnapshot. While TypeInformation provides access to
> >>> ExecutionConfig and therefore ability to access parameters from
> >>> GlobalJobParameters that could be provided through the entry point,
> >>> restoreSerializer requires the serializer to be constructed from the
> >>> snapshot state alone.
> >>>
> >>> Ideally there would be a dependency injection mechanism for user code.
> >>> Discussion in [1] indicated there isn't a direct solution. Has anyone
> >>> come across a similar use case and found a way to work around this
> >>> limitation? It might be possible to work with a configuration
> >>> singleton that initializes from a file in a well known location, but
> >>> that depends on the deployment environment and doesn't play nice with
> >>> testing.
> >>>
> >>> Thanks,
> >>> Thomas
> >>>
> >>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
> >>>
> >>
> 

Reply via email to