I’m sure this should work, but I’m missing something… I searched the archive first, but didn’t have much luck finding any insights there.
TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key. I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward reduce job, but I’m running into a snag with creating a KeyedStream. Here’s the graph: StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, new TimesliceDeserializer(), kafkaConsumerProperties)); SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = dataStream .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor()) .flatMap(new TimesliceMapper()); flattenedDataStream .keyBy("accountId", "agentId", "wideMetricId") .timeWindow(Time.seconds(60)) .reduce(AggregatableTimeslice::aggregateWith) .print(); This fails on keyBy() with the message: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key. TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely public class TimesliceMapper implements FlatMapFunction<TimesliceData, AggregatableTimeslice> { @Override public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> out) throws Exception { for (Timeslice timeslice : value.getTimeslices()) { out.collect(new AggregatableTimesliceImpl(timeslice, value, value.getAgentId())); } } } AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice interface: public interface AggregatableTimeslice { int getAccountId(); int getAgentId(); long getWideMetricId(); AggregatableTimesliceStats getTimesliceStats(); } Ron — Ron Crocker Principal Engineer & Architect ( ( •)) New Relic rcroc...@newrelic.com M: +1 630 363 8835