Hey Ron, for accessing keys of a class by their field name (like you did: .keyBy("accountId", "agentId", "wideMetricId")), the class needs to be recognized as a POJO by Flink. >From the documentation [1] a class is recognized as a POJO when:
- The class must be public. - It must have a public constructor without arguments (default constructor). - All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo(). - The type of a field must be supported by Flink. At the moment, Flink uses Avro <http://avro.apache.org/> to serialize arbitrary objects (such as Date). In your case, I believe the class you are trying to use as a POJO is AggregatableTimeslice, and that's an interface only (hence Flink is treating it as a GenericType, as you can see from the exception). What you are trying to achieve should work when you make the interface a class. Another option, and that's what I would recommend you to do in your situation is using a KeySelector function. Its basically a call-back that returns the key fields from your type. Since you are using Java 8 anyways, its not going to be a lot of boilerplate to implement the KeySelector. I would also recommend you to register the implementations of the AggregatableTimeslice class with Kryo for better performance: env.registerType(AggregatableTimesliceImpl.class); This will make the serialization with Kryo much faster. Regards, Robert [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html On Sun, Mar 13, 2016 at 4:04 AM, Ron Crocker <rcroc...@newrelic.com> wrote: > Thanks Stefano - > > That helped, but just led to different pain. I think I need to reconsider > how I treat these things. Alas, the subject of a different thread. > > Ron > — > Ron Crocker > Principal Engineer & Architect > ( ( •)) New Relic > rcroc...@newrelic.com > M: +1 630 363 8835 > > On Mar 12, 2016, at 12:11 PM, Stefano Baghino < > stefano.bagh...@radicalbit.io> wrote: > > Hi Ron, > > not all classes can be used to `keyBy` a stream with. For your case in > particular, it looks like you have to implement Comparable so that Flink > can correctly key your stream based on AggregatableTimesliceImpl. > > Take a look at the first slides here for more information on keying: > http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html > > Hope I helped. > > On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <rcroc...@newrelic.com> > wrote: > >> I’m sure this should work, but I’m missing something… I searched the >> archive first, but didn’t have much luck finding any insights there. >> >> TL;DR: org.apache.flink.api.common.InvalidProgramException: This type >> (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot >> be used as key. >> >> I’m just getting started with a 1.0 implementation of a new task. It’s a >> pretty straightforward reduce job, but I’m running into a snag with >> creating a KeyedStream. >> >> Here’s the graph: >> StreamExecutionEnvironment see = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> DataStream<TimesliceData> dataStream = see.addSource(new >> FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, >> new TimesliceDeserializer(), kafkaConsumerProperties)); >> >> SingleOutputStreamOperator<AggregatableTimeslice> >> flattenedDataStream = dataStream >> .assignTimestampsAndWatermarks(new >> TimesliceTimestampExtractor()) >> .flatMap(new TimesliceMapper()); >> >> flattenedDataStream >> .keyBy("accountId", "agentId", "wideMetricId") >> .timeWindow(Time.seconds(60)) >> .reduce(AggregatableTimeslice::aggregateWith) >> .print(); >> >> This fails on keyBy() with the message: >> >> Exception in thread "main" >> org.apache.flink.api.common.InvalidProgramException: This type >> (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot >> be used as key. >> >> TimesliceMapper is a concrete implementation of >> FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely >> >> public class TimesliceMapper implements FlatMapFunction<TimesliceData, >> AggregatableTimeslice> { >> @Override >> public void flatMap(TimesliceData value, >> Collector<AggregatableTimeslice> out) throws Exception { >> for (Timeslice timeslice : value.getTimeslices()) { >> out.collect(new AggregatableTimesliceImpl(timeslice, value, >> value.getAgentId())); >> } >> } >> } >> >> AggregatableTimesliceImpl is a simple concrete implementation of the >> AggregatableTimeslice interface: >> >> public interface AggregatableTimeslice { >> int getAccountId(); >> int getAgentId(); >> long getWideMetricId(); >> AggregatableTimesliceStats getTimesliceStats(); >> } >> >> Ron >> — >> Ron Crocker >> Principal Engineer & Architect >> ( ( •)) New Relic >> rcroc...@newrelic.com >> M: +1 630 363 8835 >> >> > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit > > >