Sure.
Firstly I followed the steps showed here to build the project:  flink-htm
github <https://github.com/htm-community/flink-htm/wiki/Building>  

In my project I want to perform anomaly detection of values in a stream. I
have a Kafka broker as source:

/DataStream<Tuple6&lt;String, String, Date, String, String, Double>> stream
= env
                                .addSource(new FlinkKafkaConsumer010<>(TOPIC, 
new CustomDeserializer(),
properties))
                                .assignTimestampsAndWatermarks(new 
CustomTimestampExtractor())
                                .keyBy(0);/


The double field of Tuple6 is my target value. I tried to follow the example
find in the code download by GitHub, so I transformed my streams of Tuple6
in a stream of KafkaRecord (an object who has just a field value)

/DataStream<Harness.KafkaRecord> kafkaStream = stream.map(new
MapFunction<Tuple6&lt;String, String, Date, String, String, Double>,
Harness.KafkaRecord>(){ 
                        
                                @Override 
                        public Harness.KafkaRecord map(Tuple6<String, String, 
Date, String,
String, Double> value) throws Exception { 
                                        return new 
Harness.KafkaRecord(value.f5); 
                        } 
                }); /


Note that if I print stream or kafkaStream everything works fine.
So I used HTM functions as in the example:

/DataStream<Tuple2&lt;Double,Double>> result = HTM.learn(kafkaStream, new
Harness.AnomalyNetwork())
                                .select(new 
InferenceSelectFunction<Harness.KafkaRecord,
Tuple2&lt;Double, Double>>() {
                   
                                        @Override
                    public Tuple2<Double,Double>
select(Tuple2<Harness.KafkaRecord, NetworkInference> inference) throws
Exception {
                              
                                return new Tuple2<Double, Double>(
                                                3.333333 //fake value
                                                inference.f1.getAnomalyScore());
                    }
                }); /

I set my network in a way very similar to that described in example. I just
indicated my value is a "double" instead of a "number" and changed the
encoder from "ScalarEncoder" to "RandomDistributedScalarEncoder".
I have no error during the execution but if I try to print "result" nothing
is printed. Just for attempt, I tried to print datastream on a file, nothing
anyway.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-HTM-integration-tp15113p15176.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to