Thanks for the response. That is a bit surprising that it is always a new instance given the various API signatures that take in a Configuration instance. The best practices docs ( https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program ) also sort of mention it, but I just noticed most of those seem like they are for the DataSet API rather than the DataStream API (I don't know if there is a big difference between the programming APIs there). I'm still new to these things, so I could be making invalid assumptions, too.
I think I have a simple idea for how to get dependency style injection working anyways by. - Pass in a Serializable "injector"/proxy object in the constructor - In the "open" (or body of the function) get the things/initialize stuff I want that may or may not be Serializable, e.g. an HTTP client or database connection from that object - Don't use the Configuration instance since it doesn't do anything anyways I haven't thought through any possible security holes or considerations with this approach yet. Thanks for the response, that clears up my confusion - now just to explore and find some better ways to test this stuff! On Fri, Sep 22, 2017 at 11:51 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi, > > The passing in of a Configuration instance in the open method is actually > a leftover artifact of the DataStream API that remains only due to API > backwards compatibility reasons. > There’s actually no way to modify what configuration is retrieved there > (and it is actually always a new empty Configuration). > > Normally, to inject dependencies into your operators, you would simply do > that be supplying it through the constructor of the operator, and store it > as class fields for future use in the operator work methods. > Make sure that they are serializable, as the operator will need to be > serialized when deploying the job. I’m assuming that this should be > possible for you anyway, since you were trying to write that information > into the Configuration. > > Hope this helps! > > Cheers, > Gordon > > > On 20 September 2017 at 11:25:41 PM, Michael Kobit (mko...@gmail.com) > wrote: > > I'm new to Flink and in the process of trying to write a few operators and > tests for them. One of the issues I've ran into is "how do I properly set > up the dependencies for an operator". I've discovered the serialization > constraints and learned about the execution some model as I've started to > progress through it, but I'm still struggling to find an analog for > dependency injection in Flink. > > I was experimenting with different ways to supply configuration for the > *Rich* functions to basically set themselves up and tear themselves down > with their dependencies on open/close. I wanted to basically "inject" a > dependency say like an HTTP client that caches, and then mock that > dependency for a local test instead of actually making HTTP calls. It > seemed like it could be done by getting and getting the correct > implementation types from the config using some custom injector type > (analogous to Spring or Guice dependency injection). I know I have to deal > serialization of the operators, which is why I was thinking I could do this > in open/close and have the magical injector be serializable (and possibly > be part of the config). This may or may not be a bad idea already, but bear > with me (and any feedback is very appreciated). > > I was doing some local testing using StreamExecutionEnvironment, but > wasn't able to actually pass in configuration options to the local stream > execution. > > I tried it these ways: > > 1. Create with a config > - StreamExecutionEnvironment.createLocalEnvironment(1, configuration); > 2. Configure the created LocalStreamEnvironment > by env.getConfig().setGlobalJobParameters(configuration) > 3. Configure thte DataStremSource<Integer> > by source.getExecutionConfig().setGlobalJobParameters(configuration) > 4. Configure the SingleOutputStreamOperator > by mapped.getExecutionConfig().setGlobalJobParameters(configuration) > > All 4 of those failed, so I felt like I am doing something wrong here, and > wanted to reach out. > > Here is the example code where all of those tests failing: > > import static org.assertj.core.api.Assertions.assertThat; > > import org.apache.flink.api.common.functions.RichMapFunction; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.contrib.streaming.DataStreamUtils; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.junit.Test; > > import java.util.Iterator; > > public class FlinkInspection { > > @Test > public void issueWithLocalStreamEnvironmentCreateWithConfiguration() > throws Exception { > Configuration configuration = new Configuration(); > configuration.setInteger("key", 10); > LocalStreamEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, configuration); > DataStreamSource<Integer> source = env.fromElements(1, 2); > > SingleOutputStreamOperator<Integer> mapped = source.map(new > ConfigurationRetrievingOperator()); > > Iterator<Integer> collection = DataStreamUtils.collect(mapped); > env.execute(); > > assertThat(collection).containsExactlyInAnyOrder(10, 20); > } > > @Test > public void > issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration() throws > Exception { > Configuration configuration = new Configuration(); > configuration.setInteger("key", 10); > LocalStreamEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1); > env.getConfig().setGlobalJobParameters(configuration); > DataStreamSource<Integer> source = env.fromElements(1, 2); > > SingleOutputStreamOperator<Integer> mapped = source.map(new > ConfigurationRetrievingOperator()); > > Iterator<Integer> collection = DataStreamUtils.collect(mapped); > env.execute(); > > assertThat(collection).containsExactlyInAnyOrder(10, 20); > } > > @Test > public void issueWithLocalStreamEnvironmentConfiguringDataStreamSource() > throws Exception { > LocalStreamEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1); > DataStreamSource<Integer> source = env.fromElements(1, 2); > Configuration configuration = new Configuration(); > configuration.setInteger("key", 10); > source.getExecutionConfig().setGlobalJobParameters(configuration); > > SingleOutputStreamOperator<Integer> mapped = source.map(new > ConfigurationRetrievingOperator()); > > Iterator<Integer> collection = DataStreamUtils.collect(mapped); > env.execute(); > > assertThat(collection).containsExactlyInAnyOrder(10, 20); > } > > @Test > public void > issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator() throws > Exception { > LocalStreamEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1); > DataStreamSource<Integer> source = env.fromElements(1, 2); > Configuration configuration = new Configuration(); > configuration.setInteger("key", 10); > > SingleOutputStreamOperator<Integer> mapped = source.map(new > ConfigurationRetrievingOperator()); > mapped.getExecutionConfig().setGlobalJobParameters(configuration); > > Iterator<Integer> collection = DataStreamUtils.collect(mapped); > env.execute(); > > assertThat(collection).containsExactlyInAnyOrder(10, 20); > } > > static class ConfigurationRetrievingOperator extends > RichMapFunction<Integer, Integer> { > > private int factor = -1; > > @Override > public Integer map(final Integer value) throws Exception { > return value * factor; > } > > @Override > public void open(final Configuration parameters) throws Exception { > factor = parameters.getInteger("key", 0); > } > } > } > > > 1. Any suggestions on how I should think about the dependency > injection? > 2. Are there ways to customize the Configuration that is passed into > Rich functions? > 3. Is this an issue with LocalStreamEnvironment, or am I doing > something completely wrong? > >