Hey Ron,

for accessing keys of a class by their field name (like you did:
.keyBy("accountId",
"agentId", "wideMetricId")), the class needs to be recognized as a POJO by
Flink.
>From the documentation [1] a class is recognized as a POJO when:


   -

   The class must be public.
   -

   It must have a public constructor without arguments (default
   constructor).
   -

   All fields are either public or must be accessible through getter and
   setter functions. For a field called foo the getter and setter methods
   must be named getFoo() and setFoo().
   -

   The type of a field must be supported by Flink. At the moment, Flink
   uses Avro <http://avro.apache.org/> to serialize arbitrary objects (such
   as Date).

In your case, I believe the class you are trying to use as a POJO is
AggregatableTimeslice, and that's an interface only (hence Flink is
treating it as a GenericType, as you can see from the exception).
What you are trying to achieve should work when you make the interface a
class.
Another option, and that's what I would recommend you to do in your
situation is using a KeySelector function. Its basically a call-back that
returns the key fields from your type. Since you are using Java 8 anyways,
its not going to be a lot of boilerplate to implement the KeySelector.

I would also recommend you to register the implementations of the
AggregatableTimeslice class with Kryo for better performance:
env.registerType(AggregatableTimesliceImpl.class);
This will make the serialization with Kryo much faster.

Regards,
Robert

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html


On Sun, Mar 13, 2016 at 4:04 AM, Ron Crocker <rcroc...@newrelic.com> wrote:

> Thanks Stefano -
>
> That helped, but just led to different pain. I think I need to reconsider
> how I treat these things. Alas, the subject of a different thread.
>
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
> M: +1 630 363 8835
>
> On Mar 12, 2016, at 12:11 PM, Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
> Hi Ron,
>
> not all classes can be used to `keyBy` a stream with. For your case in
> particular, it looks like you have to implement Comparable so that Flink
> can correctly key your stream based on AggregatableTimesliceImpl.
>
> Take a look at the first slides here for more information on keying:
> http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html
>
> Hope I helped.
>
> On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <rcroc...@newrelic.com>
> wrote:
>
>> I’m sure this should work, but I’m missing something… I searched the
>> archive first, but didn’t have much luck finding any insights there.
>>
>> TL;DR: org.apache.flink.api.common.InvalidProgramException: This type
>> (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot
>> be used as key.
>>
>> I’m just getting started with a 1.0 implementation of a new task. It’s a
>> pretty straightforward reduce job, but I’m running into a snag with
>> creating a KeyedStream.
>>
>> Here’s the graph:
>>         StreamExecutionEnvironment see =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>         DataStream<TimesliceData> dataStream = see.addSource(new
>> FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
>> new TimesliceDeserializer(), kafkaConsumerProperties));
>>
>>         SingleOutputStreamOperator<AggregatableTimeslice>
>> flattenedDataStream = dataStream
>>                 .assignTimestampsAndWatermarks(new
>> TimesliceTimestampExtractor())
>>                 .flatMap(new TimesliceMapper());
>>
>>         flattenedDataStream
>>                 .keyBy("accountId", "agentId", "wideMetricId")
>>                 .timeWindow(Time.seconds(60))
>>                 .reduce(AggregatableTimeslice::aggregateWith)
>>                 .print();
>>
>> This fails on keyBy() with the message:
>>
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: This type
>> (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot
>> be used as key.
>>
>> TimesliceMapper is a concrete implementation of 
>> FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
>>
>> public class TimesliceMapper implements FlatMapFunction<TimesliceData, 
>> AggregatableTimeslice> {
>>     @Override
>>     public void flatMap(TimesliceData value, 
>> Collector<AggregatableTimeslice> out) throws Exception {
>>         for (Timeslice timeslice : value.getTimeslices()) {
>>             out.collect(new AggregatableTimesliceImpl(timeslice, value, 
>> value.getAgentId()));
>>         }
>>     }
>> }
>>
>> AggregatableTimesliceImpl is a simple concrete implementation of the 
>> AggregatableTimeslice interface:
>>
>> public interface AggregatableTimeslice {
>>     int getAccountId();
>>     int getAgentId();
>>     long getWideMetricId();
>>     AggregatableTimesliceStats getTimesliceStats();
>> }
>>
>> Ron
>> —
>> Ron Crocker
>> Principal Engineer & Architect
>> ( ( •)) New Relic
>> rcroc...@newrelic.com
>> M: +1 630 363 8835
>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>
>
>

Reply via email to