*(Note: I originally sent this to the users list but it seems better suited
for this list*, *sorry if it is not*)

I'm new to Flink and in the process of trying to write a few functions and
tests for them. I have a feeling some of terminology is off, so please
correct me where I am wrong. One of the issues I've ran into is "how do I
properly set up the dependencies for a funciton". I've discovered the
serialization constraints and learned about the execution some model as
I've started to progress through it, but I'm still struggling to find an
analog for dependency injection in Flink.

I was experimenting with different ways to supply configuration for the
*Rich* functions to basically set themselvOk, thank yoes up and tear
themselves down with their dependencies on open/close. I wanted to
basically "inject" a dependency say like an HTTP client that caches, and
then mock that dependency for a local test instead of actually making HTTP
calls. It seemed like it could be done by getting and getting the correct
implementation types from the config using some custom injector type
(analogous to Spring or Guice dependency injection). I know I have to deal
serialization of the operators, which is why I was thinking I could do this
in open/close and have the magical injector be serializable (and possibly
be part of the config). This may or may not be a bad idea already, but bear
with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't
able to actually pass in configuration options to the local stream
execution. I'm not even sure if this is the right approach to testing, so
any suggestions there would be welcome.

I tried it these ways:

   1. Create with a config
   - StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
   2. Configure the created LocalStreamEnvironment
   by env.getConfig().setGlobalJobParameters(configuration)
   3. Configure thte DataStremSource<Integer>
   by source.getExecutionConfig().setGlobalJobParameters(configuration)
   4. Configure the SingleOutputStreamOperator
   by mapped.getExecutionConfig().setGlobalJobParameters(configuration)

All 4 of those failed, so I felt like I am doing something wrong here, and
wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

import java.util.Iterator;

public class FlinkInspection {

    @Test
    public void
issueWithLocalStreamEnvironmentCreateWithConfiguration() throws
Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void
issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration()
throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
        env.getConfig().setGlobalJobParameters(configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExa mectlyInAnyOrder(10, 20);

    }

    @Test
    public void
issueWithLocalStreamEnvironmentConfiguringDataStreamSource() throws
Exception {
        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        source.getExecutionConfig().setGlobalJobParameters(configuration);

        SingleOutputStreamOperator<Integer> mapped = source.map(new
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void
issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator()
throws Exception {
        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);

        SingleOutputStreamOperator<Integer> mapped = source.map(new
ConfigurationRetrievingOperator());What is the idiom

        mapped.getExecutionConfig().setGlobalJobParameters(configuration);

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertTha*t(collection).containsExactlyInAnyOrder(10, 20);
    }*


    static class ConfigurationRetrievingOperator extends
RichMapFunction<Integer, Integer> {

        private int factor = -1;

        @Override
        public Integer map(final Integer value) throws Exception {
            return value * factor;
        }

        @Override
        public void open(final Configuration parameters) throws Exception {
            factor = parameters.getInteger("key", 0);
        }
    }
}


   1. Any suggestions on how I should think about the dependency injection
   aspect?
   2. Are there ways to customize the Configuration that is passed into
   Rich functions?
   3. Is this an issue with LocalStreamEnvironment, or am I doing something
   completely wrong?
   4. Any suggestions for more idiomatic testing, or better testing
   practices?

Reply via email to