Thanks Stefano -

That helped, but just led to different pain. I think I need to reconsider how I 
treat these things. Alas, the subject of a different thread.

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835

> On Mar 12, 2016, at 12:11 PM, Stefano Baghino <stefano.bagh...@radicalbit.io> 
> wrote:
> 
> Hi Ron,
> 
> not all classes can be used to `keyBy` a stream with. For your case in 
> particular, it looks like you have to implement Comparable so that Flink can 
> correctly key your stream based on AggregatableTimesliceImpl.
> 
> Take a look at the first slides here for more information on keying: 
> http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html 
> <http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html>
> 
> Hope I helped.
> 
> On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <rcroc...@newrelic.com 
> <mailto:rcroc...@newrelic.com>> wrote:
> I’m sure this should work, but I’m missing something… I searched the archive 
> first, but didn’t have much luck finding any insights there.
> 
> TL;DR: org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be 
> used as key.
> 
> I’m just getting started with a 1.0 implementation of a new task. It’s a 
> pretty straightforward reduce job, but I’m running into a snag with creating 
> a KeyedStream.
> 
> Here’s the graph:
>         StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
>         DataStream<TimesliceData> dataStream = see.addSource(new 
> FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
>  new TimesliceDeserializer(), kafkaConsumerProperties));
> 
>         SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream 
> = dataStream
>                 .assignTimestampsAndWatermarks(new 
> TimesliceTimestampExtractor())
>                 .flatMap(new TimesliceMapper());
> 
>         flattenedDataStream
>                 .keyBy("accountId", "agentId", "wideMetricId")
>                 .timeWindow(Time.seconds(60))
>                 .reduce(AggregatableTimeslice::aggregateWith)
>                 .print();
> 
> This fails on keyBy() with the message: 
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be 
> used as key.
> TimesliceMapper is a concrete implementation of 
> FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
> public class TimesliceMapper implements FlatMapFunction<TimesliceData, 
> AggregatableTimeslice> {
>     @Override
>     public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> 
> out) throws Exception {
>         for (Timeslice timeslice : value.getTimeslices()) {
>             out.collect(new AggregatableTimesliceImpl(timeslice, value, 
> value.getAgentId()));
>         }
>     }
> }
> AggregatableTimesliceImpl is a simple concrete implementation of the 
> AggregatableTimeslice interface:
> public interface AggregatableTimeslice {
>     int getAccountId();
>     int getAgentId();
>     long getWideMetricId();
>     AggregatableTimesliceStats getTimesliceStats();
> }
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com <mailto:rcroc...@newrelic.com>
> M: +1 630 363 8835 <tel:%2B1%20630%20363%208835>
> 
> 
> 
> -- 
> BR,
> Stefano Baghino
> 
> Software Engineer @ Radicalbit

Reply via email to