Thanks Stefano - That helped, but just led to different pain. I think I need to reconsider how I treat these things. Alas, the subject of a different thread.
Ron — Ron Crocker Principal Engineer & Architect ( ( •)) New Relic rcroc...@newrelic.com M: +1 630 363 8835 > On Mar 12, 2016, at 12:11 PM, Stefano Baghino <stefano.bagh...@radicalbit.io> > wrote: > > Hi Ron, > > not all classes can be used to `keyBy` a stream with. For your case in > particular, it looks like you have to implement Comparable so that Flink can > correctly key your stream based on AggregatableTimesliceImpl. > > Take a look at the first slides here for more information on keying: > http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html > <http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html> > > Hope I helped. > > On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <rcroc...@newrelic.com > <mailto:rcroc...@newrelic.com>> wrote: > I’m sure this should work, but I’m missing something… I searched the archive > first, but didn’t have much luck finding any insights there. > > TL;DR: org.apache.flink.api.common.InvalidProgramException: This type > (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be > used as key. > > I’m just getting started with a 1.0 implementation of a new task. It’s a > pretty straightforward reduce job, but I’m running into a snag with creating > a KeyedStream. > > Here’s the graph: > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > DataStream<TimesliceData> dataStream = see.addSource(new > FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, > new TimesliceDeserializer(), kafkaConsumerProperties)); > > SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream > = dataStream > .assignTimestampsAndWatermarks(new > TimesliceTimestampExtractor()) > .flatMap(new TimesliceMapper()); > > flattenedDataStream > .keyBy("accountId", "agentId", "wideMetricId") > .timeWindow(Time.seconds(60)) > .reduce(AggregatableTimeslice::aggregateWith) > .print(); > > This fails on keyBy() with the message: > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: This type > (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be > used as key. > TimesliceMapper is a concrete implementation of > FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely > public class TimesliceMapper implements FlatMapFunction<TimesliceData, > AggregatableTimeslice> { > @Override > public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> > out) throws Exception { > for (Timeslice timeslice : value.getTimeslices()) { > out.collect(new AggregatableTimesliceImpl(timeslice, value, > value.getAgentId())); > } > } > } > AggregatableTimesliceImpl is a simple concrete implementation of the > AggregatableTimeslice interface: > public interface AggregatableTimeslice { > int getAccountId(); > int getAgentId(); > long getWideMetricId(); > AggregatableTimesliceStats getTimesliceStats(); > } > Ron > — > Ron Crocker > Principal Engineer & Architect > ( ( •)) New Relic > rcroc...@newrelic.com <mailto:rcroc...@newrelic.com> > M: +1 630 363 8835 <tel:%2B1%20630%20363%208835> > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit