Hi Ron, not all classes can be used to `keyBy` a stream with. For your case in particular, it looks like you have to implement Comparable so that Flink can correctly key your stream based on AggregatableTimesliceImpl.
Take a look at the first slides here for more information on keying: http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html Hope I helped. On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <rcroc...@newrelic.com> wrote: > I’m sure this should work, but I’m missing something… I searched the > archive first, but didn’t have much luck finding any insights there. > > TL;DR: org.apache.flink.api.common.InvalidProgramException: This type > (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot > be used as key. > > I’m just getting started with a 1.0 implementation of a new task. It’s a > pretty straightforward reduce job, but I’m running into a snag with > creating a KeyedStream. > > Here’s the graph: > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > DataStream<TimesliceData> dataStream = see.addSource(new > FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, > new TimesliceDeserializer(), kafkaConsumerProperties)); > > SingleOutputStreamOperator<AggregatableTimeslice> > flattenedDataStream = dataStream > .assignTimestampsAndWatermarks(new > TimesliceTimestampExtractor()) > .flatMap(new TimesliceMapper()); > > flattenedDataStream > .keyBy("accountId", "agentId", "wideMetricId") > .timeWindow(Time.seconds(60)) > .reduce(AggregatableTimeslice::aggregateWith) > .print(); > > This fails on keyBy() with the message: > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: This type > (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot > be used as key. > > TimesliceMapper is a concrete implementation of > FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely > > public class TimesliceMapper implements FlatMapFunction<TimesliceData, > AggregatableTimeslice> { > @Override > public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> > out) throws Exception { > for (Timeslice timeslice : value.getTimeslices()) { > out.collect(new AggregatableTimesliceImpl(timeslice, value, > value.getAgentId())); > } > } > } > > AggregatableTimesliceImpl is a simple concrete implementation of the > AggregatableTimeslice interface: > > public interface AggregatableTimeslice { > int getAccountId(); > int getAgentId(); > long getWideMetricId(); > AggregatableTimesliceStats getTimesliceStats(); > } > > Ron > — > Ron Crocker > Principal Engineer & Architect > ( ( •)) New Relic > rcroc...@newrelic.com > M: +1 630 363 8835 > > -- BR, Stefano Baghino Software Engineer @ Radicalbit