Hi,

I'm currently working with flink for my bachelor thesis and I'm running
into some odd issues with flink in regards to factories.

I've built a small "proof of concept" and the code can be found here:
https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest

The idea is that a Config-singleton holds information or objects to use,
e.g. an AppleFactory (default) which implements a specific IDataFactory
interface. This AppleFactory is then used in a flatMap to create Apples
(objects which implement the IData interface):

> 
>         System.out.println("Factory before processedData: " + 
> Config.getInstance().getFactory().getClass());
>         DataSet<IData> processedData = 
> this.getEnv().fromCollection(inputData).flatMap(new FlatMapFunction<Integer, 
> IData>() {
>             @Override
>             public void flatMap(Integer integer, Collector<IData> collector) 
> throws Exception {
>                 if (integer % 2 == 0) {
>                     
> collector.collect(Config.getInstance().getFactory().newInstance());
>                 }
>             }
>         });
>        System.out.println("Factory after processedData: " + 
> Config.getInstance().getFactory().getClass());
>         try {
>             System.out.println("Class created: " + 
> processedData.collect().get(0).getClass());
>             this.getDataHolder().setDataList(processedData.collect());
>         } catch (Exception e) {
>             e.printStackTrace();
>         }


This happens in the "Config -> initData()" function. My Flink-Job looks
like this:

>       public static void main(String[] args) throws Exception {
> 
>               Config c = Config.getInstance(); //Use AppleFactory by default
> 
>         //BOOM: Somehow flink ignores this?
>         c.setFactory(new PearFactory());
> 
>         c.initData();
> 
>         DataSet<IData> data = 
> c.getEnv().fromCollection(c.getDataHolder().getDataList());

As you can see before the "c.initData()" call I set the factory to a
"PearFactory()" which will produce Pear-objects (also implementing the
IData interface).

Running the code will print the following text:

> Class created: class factorytest.Data.Apple

This, however, means that flink didn't catch (or ignored?) that the
factory has changed and still creates objects of type Apple.

Instead I'd expect the processedData.collect() list to contain
Pear-objects. What is even more confusing is that the two "Factory
before/after processedData" print statements correctly return the
PearFactory class.

What's the best way to fix this? Any tips/tricks/questions?

I guess that this issue is might be hard to explain in words, so I'd
really appreciate it if someone could have a look at the code and maybe
do an example run:

Job.java:
https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/src/main/java/factorytest/Job.java

Config.java:
https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/src/main/java/factorytest/Config.java

Example run:
https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/EXAMPLE_RUN_OUTPUT.txt

Kind regards,
Sebastian Neef

Reply via email to