Yuhao Bi created FLINK-25481: -------------------------------- Summary: SourceIndex comparison in SplitEnumeratorContextProxy Key: FLINK-25481 URL: https://issues.apache.org/jira/browse/FLINK-25481 Project: Flink Issue Type: Bug Affects Versions: 1.14.2, 1.13.5, 1.15.0 Reporter: Yuhao Bi
In [HybridSourceSplitEnumerator.java|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java] the sourceIndex is used by value, but in the following block, it's compared by reference address after boxing {code:java} @Override public Map<Integer, ReaderInfo> registeredReaders() { // TODO: not start enumerator until readers are ready? Map<Integer, ReaderInfo> readers = realContext.registeredReaders(); if (readers.size() != readerSourceIndex.size()) { return filterRegisteredReaders(readers); } Integer lastIndex = null; for (Integer sourceIndex : readerSourceIndex.values()) { // Integer reference variable compared by '==' operator if (lastIndex != null && lastIndex != sourceIndex) { return filterRegisteredReaders(readers); } lastIndex = sourceIndex; } return readers; } private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, ReaderInfo> readers) { Map<Integer, ReaderInfo> readersForSource = new HashMap<>(readers.size()); for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) { // sourceIndex cast to Integer then compared by '==' operator if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) { readersForSource.put(e.getKey(), e.getValue()); } } return readersForSource; } {code} Java will cache Integer in the range between -128 to +127 so the code works, but if my understanding is correct it might be better replaced by .equals method call. -- This message was sent by Atlassian Jira (v8.20.1#820001)