Hi Robert, using the constructor is actually the selected way. Using the existing lifecycle method was an idea to integrate it more with the existing framework design ;-)
Best Christian 2016-01-18 13:38 GMT+01:00 Robert Metzger <[email protected]>: > Hi Christian, > > I think the DataStream API does not allow you to pass any parameter to the > open(Configuration) method. > That method is only used in the DataSet (Batch) API, and its use is > discouraged. > > A much better option to pass a Configuration into your function is as > follows: > > > Configuration mapConf = new Configuration(); > mapConf.setDouble("somthing", 1.2); > > DataStream<Tuple2<String, Integer>> counts = > // split up the lines in pairs (2-tuples) containing: (word,1) > > text.flatMap(new Tokenizer(mapConf)) > // group by the tuple field "0" and sum up tuple field "1" > .keyBy(0).sum(1); > > > And in the Tokenizer: > > public static final class Tokenizer implements FlatMapFunction<String, > Tuple2<String, Integer>> { > private final Configuration mapConf; > > public Tokenizer(Configuration mapConf) { > this.mapConf = mapConf; > } > > > This works as long as the type you're passing is serializable. > > > > On Mon, Jan 18, 2016 at 12:43 PM, Christian Kreutzfeldt <[email protected]> > wrote: > >> Hi Max, >> >> maybe I explained it a bit mistakable ;-) >> >> I have a stream-based application which contains a RichFilterFunction >> implementation. The parent provides a lifecycle method open >> (open(Configuration)) which receives a Configuration object as input. I >> would like to use this call to pass options into the operator instance. >> >> Unfortunately, I found no hint where and how to provide the information >> such that I receive them at the described method. Actually, I am accessing >> the surrounding runtime context to retrieve the global job parameters where >> I extract the desired information from. But for some reasons I do not want >> the operator to receive its setup information from the provided >> Configuration instance ;-) >> >> That's why I am looking for the place where the configuration object is >> created and passed into the rich filter function. I would like to insert >> dedicated information for a dedicated filter instance. >> >> Best >> Christian >> >> >> 2016-01-18 12:30 GMT+01:00 Maximilian Michels <[email protected]>: >> >>> Hi Christian, >>> >>> For your implementation, would it suffice to pass a Configuration with >>> your RichFilterFunction? You said the global job parameters are not >>> passed on to your user function? Can you confirm this is a bug? >>> >>> Cheers, >>> Max >>> >>> On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt >>> <[email protected]> wrote: >>> > Hi Fabian, >>> > >>> > thanks for your quick response. I just figured out that I forgot to >>> mention >>> > a small but probably relevant detail: I am working with the streaming >>> api. >>> > >>> > Although there is a way to access the overall job settings, I need a >>> > solution to "reduce" the view on configuration options available on >>> operator >>> > level. >>> > For example, I would like to pass instance specific settings like an >>> > operator identifier but there might be different operators in the >>> overall >>> > program. >>> > >>> > Best >>> > Christian >>> > >>> > 2016-01-13 10:52 GMT+01:00 Fabian Hueske <[email protected]>: >>> >> >>> >> Hi Christian, >>> >> >>> >> the open method is called by the Flink workers when the parallel >>> tasks are >>> >> initialized. >>> >> The configuration parameter is the configuration object of the >>> operator. >>> >> You can set parameters in the operator config as follows: >>> >> >>> >> DataSet<String> text = ... >>> >> DataSet<Tuple2<String, Integer> wc = text.flatMap(new >>> >> Tokenizer()).getParameters().setString("myKey", "myVal"); >>> >> >>> >> Best, Fabian >>> >> >>> >> >>> >> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <[email protected]>: >>> >>> >>> >>> Hi >>> >>> >>> >>> While working on a RichFilterFunction implementation I was >>> wondering, if >>> >>> there is a much better way to access configuration >>> >>> options read from file during startup. Actually, I am using >>> >>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters() >>> >>> to get access to my settings. >>> >>> >>> >>> Reason for that is, that the Configuration parameter provided to the >>> open >>> >>> function does not carry my settings. That is probably >>> >>> the case as I use >>> >>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to >>> pass my >>> >>> configuration into the environment >>> >>> which in turn is not passed on as part of the open call - I found no >>> >>> other way to handle configuration ;-) >>> >>> >>> >>> My question is: who is responsible for calling the open function, >>> where >>> >>> does the configuration parameter has its origins aka where >>> >>> is its content taken from and is it possible to define somewhere in >>> the >>> >>> main program which configuration to pass into a specific operator? >>> >>> >>> >>> Best >>> >>> Christian >>> >> >>> >> >>> > >>> >> >> >
