Benedict Jin created FLINK-7424: ----------------------------------- Summary: `CEP` component make `KeyedStream` choose wrong channel Key: FLINK-7424 URL: https://issues.apache.org/jira/browse/FLINK-7424 Project: Flink Issue Type: Bug Components: CEP, Streaming Reporter: Benedict Jin Assignee: Benedict Jin
`CEP` component make `KeyedStream` choose wrong channel Origin KeySelector is perfect right. {code:java} public static KeySelector<HBaseServerLog, Integer> buildKeySelector() { return (KeySelector<HBaseServerLog, Integer>) log -> { if (log == null) return 0; Integer flumeId; if ((flumeId = log.getFlumeId()) == null) return 1; return flumeId; }; } {code} After some changes, it will throw {code.java}Key group index out of range of key group range [16, 32){code} exception. {code.java} public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int parallelism) { return new KeySelector<HBaseServerLog, Integer>() { private Random r = new Random(System.nanoTime()); @Override public Integer getKey(HBaseServerLog log) throws Exception { if (log == null) return 0; Integer flumeId; if ((flumeId = log.getFlumeId()) == null) return 1; return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * r.nextInt(parallelism)), 0); } }; } {code} But, after {code.java}MathUtils.murmurHash(keyHash) % maxParallelism{code} process, it shouldn't be wrong. Actually, when we add some `CEP` component (IterativeCondition/PatternFlatSelectFunction) code after it. It make the {code.java}KeySelector{code} choose wrong channel and throw IllegalArgumentException. -- This message was sent by Atlassian JIRA (v6.4.14#64029)