Hi, I tried to remove the returns function but if I do it, the program returns an error (curious since the return value is a Double).
I'm absolutely sure env.execute() is called because I see other streams printed. The program is connected, I followed exactly the example showed in the library, I think that's a bug and I need to solve it. The inference stream where is called the map function is surely full. Thank you Fabian Hueske-2 wrote > Hi Andrea, > > a MapFunction calls its map() function for each stream element and returns > exactly one result value. > MapFunctions are used for 1-to-1 transformations. > The returns() method allows to specify the return type of an operator, in > your case the MapOperator. It is only necessary if Flink cannot > automatically determine the return type of an operator. > > It's not easy to identify what is going on from the code you posted. > Are you sure the program is executed, i.e., did you call env.execute()? > Are all parts of the program connected? > Are you sure that the input stream of the Map operator emits records? > > Best, Fabian > > > 2017-09-02 19:23 GMT+02:00 AndreaKinn < > kinn6aer@ > >: > >> Hi, >> Excuse me for the unclear title but I don't know how to summarise the >> question. >> I'm using an external library integrated with Flink called Flink-HTM. It >> is >> still a prototype. >> Internally, it performs everything I want but I have a problem returning >> evaluated values in a printable datastream. >> I posted here my question because I believe the problem is tied with >> Flink >> and not with the library. >> >> Essentially I have the following code in my main: >> >> */DataStream > <Double> > result = HTM.learn(kafkaStream, new >> Harness.AnomalyNetwork()) >> .select(new >> InferenceSelectFunction<Harness.KafkaRecord, > > Double>() { >> @Override >> public Double select(Tuple2<Harness.KafkaRecord, > > NetworkInference> inference) throws Exception { >> return >> inference.f1.getAnomalyScore(); >> } >> });/* >> >> Then I want to print the datastream "result". >> Following the /learn/ method the flink-htm lib correctly performs many >> operations on data. >> At the end of this computation, in another class I have a >> /DataStream<T, > > NetworkInference>/ and essentially I have to call the overridden > "/select/" >> method on that/ Datastream<T,NetworkInference>/. >> >> The code which would do that is: >> >> */final DataStream<Tuple2&lt;T, NetworkInference>> >> inferenceStream = >> inferenceStreamBuilder.build(); >> >> return inferenceStream >> .map(new InferenceSelectMapper<T, > > R>(clean(inferenceSelectFunction))) >> .returns(returnType); / >> * >> where /map/ and /returns/ methods are described in Flink's >> /DataStream.class./ >> >> */public > <R> > SingleOutputStreamOperator > <R> > map(MapFunction<T, R> mapper) { >> >> TypeInformation > <R> > outType = >> TypeExtractor.getMapReturnTypes(clean(mapper), getType(), >> Utils.getCallLocationName(), true); >> >> return transform("Map", outType, new >> StreamMap<>(clean(mapper))); >> }/* >> >> */public SingleOutputStreamOperator > <T> > returns(TypeInformation > <T> > typeInfo) >> { >> requireNonNull(typeInfo, "TypeInformation must not be >> null"); >> >> transformation.setOutputType(typeInfo); >> return this; >> }/* >> >> while /InferenceSelectMapper<T,R>/ is the following class: >> >> */private static class InferenceSelectMapper<T, R> implements >> MapFunction<Tuple2&lt;T, NetworkInference>, R> { >> >> private final InferenceSelectFunction<T, R> >> inferenceSelectFunction; >> >> public InferenceSelectMapper(InferenceSelectFunction<T, R> >> inferenceSelectFunction) { >> this.inferenceSelectFunction = >> inferenceSelectFunction; >> } >> >> @Override >> public R map(Tuple2<T, NetworkInference> value) throws >> Exception { >> return inferenceSelectFunction.select(value); >> } >> }/* >> >> which implements Flink's /MapFunction/. I absolutely need the program >> call >> the /InferenceSelectMapper.map()/ method to call my defined "/select/" >> function, unfortunately this doesn't happen. As consequence of that, in >> main >> method and in the IDE console, I suppose the /DataStream result/ is not >> filled and none output is printed, which is the my fundamental problem. >> >> Since I'm not a Flink expert I don't know how to perform many operations >> at >> "lower level". >> Honestly I don't understand exactly what /map/ and /returns/ methods of >> /DataStream.class/ do. I thought a lot about it and I also tried to find >> a >> way to call /InferenceSelectMapper.map()/ method but I don't know how to >> extract the /Tuple2<T, NetworkInference>/ from the >> /DataStream<Tuple2&lt;...>>/. >> >> I'm absolutely sure that the /map/ function I need in >> /InferenceSelectMethod/ is not called because it doesn't appear in call >> hierarchy and also adding a print instruction that is not showed. >> >> Please, can you help me to solve this? I've been stuck on it for a week >> while the lib's owner doesn't reply to my mails. >> Sorry for the length. >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050. >> n4.nabble.com/ >> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/