Hi Ron,

not all classes can be used to `keyBy` a stream with. For your case in
particular, it looks like you have to implement Comparable so that Flink
can correctly key your stream based on AggregatableTimesliceImpl.

Take a look at the first slides here for more information on keying:
http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html

Hope I helped.

On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <rcroc...@newrelic.com> wrote:

> I’m sure this should work, but I’m missing something… I searched the
> archive first, but didn’t have much luck finding any insights there.
>
> TL;DR: org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot
> be used as key.
>
> I’m just getting started with a 1.0 implementation of a new task. It’s a
> pretty straightforward reduce job, but I’m running into a snag with
> creating a KeyedStream.
>
> Here’s the graph:
>         StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         DataStream<TimesliceData> dataStream = see.addSource(new
> FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
> new TimesliceDeserializer(), kafkaConsumerProperties));
>
>         SingleOutputStreamOperator<AggregatableTimeslice>
> flattenedDataStream = dataStream
>                 .assignTimestampsAndWatermarks(new
> TimesliceTimestampExtractor())
>                 .flatMap(new TimesliceMapper());
>
>         flattenedDataStream
>                 .keyBy("accountId", "agentId", "wideMetricId")
>                 .timeWindow(Time.seconds(60))
>                 .reduce(AggregatableTimeslice::aggregateWith)
>                 .print();
>
> This fails on keyBy() with the message:
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot
> be used as key.
>
> TimesliceMapper is a concrete implementation of 
> FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
>
> public class TimesliceMapper implements FlatMapFunction<TimesliceData, 
> AggregatableTimeslice> {
>     @Override
>     public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> 
> out) throws Exception {
>         for (Timeslice timeslice : value.getTimeslices()) {
>             out.collect(new AggregatableTimesliceImpl(timeslice, value, 
> value.getAgentId()));
>         }
>     }
> }
>
> AggregatableTimesliceImpl is a simple concrete implementation of the 
> AggregatableTimeslice interface:
>
> public interface AggregatableTimeslice {
>     int getAccountId();
>     int getAgentId();
>     long getWideMetricId();
>     AggregatableTimesliceStats getTimesliceStats();
> }
>
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
> M: +1 630 363 8835
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Reply via email to