Hi,

As a simple test, you can put your key extraction logic into a MapFunction, 
i.e. MapFunction<T extends SignalSet<?>, Tuple2<String, T>> and then simply use 
that field as the key:

input
  .map(new MyKeyExtractorMapper())
  .keyBy(0)

If that solves your problem it means that the key extraction is not 
deterministic. This is a problem because getKey() is called at different points 
in time and when the result is not always the same you will get that error.

Best,
Aljoscha
> On 12. Jun 2017, at 22:04, Meera <mvengadasu...@ebay.com> wrote:
> 
> Did this problem get resolved  
> 
> - I am running into this problem when I parallelize the tasks 
> Unexpected key group index. This indicates a bug.
> 
> - it runs fine on 1 parallelism. This suggests there is some key grouping
> issue - I checked my Watermark and KeySelector - they look okay.
> 
> The snippet of my KeySelector and Watermark attached to the KeyedStream. 
> public class DimensionKeySelector<T extends SignalSet<?>> implements
> KeySelector<T, String> {
> 
>       private static final long serialVersionUID = 7666263008141606451L;
>       private final String[] dimKeys;
> 
>       public DimensionKeySelector(Map<String, String> conf) {
>               if (conf.containsKey("dimKeys") == false) {
>                       throw new RuntimeException("Required 'dimKeys' 
> missing.");
>               }
>               this.dimKeys = conf.get("dimKeys").split(",");
>       }
> 
>       @Override
>       public String getKey(T signalSet) throws Exception {
>               StringBuffer group = new StringBuffer(signalSet.namespace());
>               if (signalSet.size() != 0) {
>                       for (String dim : dimKeys) {
>                               if (signalSet.dimensions().containsKey(dim)) {
>                                       
> group.append(signalSet.dimensions().get(dim));
>                               }
>                       }
>               }
>               return group.toString();
>       }
> }
> 
> and Watermark
> public class WaterMarks extends
> BoundedOutOfOrdernessTimestampExtractor<MetricSignalSet> {
> 
>    public WaterMarks(Time maxOutOfOrderness) {
>        super(maxOutOfOrderness);
>    }
> 
>    private static final long serialVersionUID = 1L;
> 
>    @Override
>    public long extractTimestamp(MetricSignalSet element) {
>        return element.get(0).timestamp().getTime();
>    }
> }
> 
> Any thoughts?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Function-on-AllWindowed-Stream-Combining-Kafka-Topics-tp12941p13663.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.

Reply via email to