[ https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-27529: ----------------------------------- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > HybridSourceSplitEnumerator sourceIndex using error Integer check > ----------------------------------------------------------------- > > Key: FLINK-27529 > URL: https://issues.apache.org/jira/browse/FLINK-27529 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.14.4, 1.15.0, 1.15.1 > Reporter: Ran Tao > Assignee: Ran Tao > Priority: Major > Labels: pull-request-available, stale-assigned > > Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer > type but == operator. > As hybrid source definition, it can concat with more than 2 child sources. so > currently works just because Integer cache(only works <=127), if we have more > sources will fail on error. In a word, we can't use == to compare Integer > index unless we limit hybrid sources only works <=127. > e.g. > {code:java} > Integer i1 = 128; > Integer i2 = 128; > System.out.println(i1 == i2); > int i3 = 128; > int i4 = 128; > System.out.println((Integer) i3 == (Integer) i4); > {code} > It will show false, false. > HybridSource Integer index comparison is below: > {code:java} > @Override > public Map<Integer, ReaderInfo> registeredReaders() { > .... > Integer lastIndex = null; > for (Integer sourceIndex : readerSourceIndex.values()) { > if (lastIndex != null && lastIndex != sourceIndex) { > return filterRegisteredReaders(readers); > } > lastIndex = sourceIndex; > } > return readers; > } > private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, > ReaderInfo> readers) { > Map<Integer, ReaderInfo> readersForSource = new > HashMap<>(readers.size()); > for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) { > if (readerSourceIndex.get(e.getKey()) == (Integer) > sourceIndex) { > readersForSource.put(e.getKey(), e.getValue()); > } > } > return readersForSource; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)