JingsongLi commented on code in PR #396: URL: https://github.com/apache/flink-table-store/pull/396#discussion_r1031189498
########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java: ########## @@ -62,4 +72,193 @@ public static int[] createIndexMapping( } return null; } + + /** + * Create index mapping from table projection to underlying data projection. + * + * @param tableProjection the table projection + * @param tableFields the fields in table + * @param dataProjection the underlying data projection + * @param dataFields the fields in underlying data + * @return the index mapping + */ + @Nullable + public static int[] createIndexMapping( + int[] tableProjection, + List<DataField> tableFields, + int[] dataProjection, + List<DataField> dataFields) { + return createIndexMapping( + tableProjection, + tableProjection.length, + tableFields, + Collections.emptyList(), + dataProjection, + dataProjection.length, + dataFields, + Collections.emptyList()); + } + + /** + * Create index mapping from table projection to underlying data projection for key value. + * + * @param tableProjection the table projection + * @param tableKeyCount the key count in table + * @param tableKeyFields the key fields in table + * @param tableValueFields the value fields in table + * @param dataProjection the data projection + * @param dataKeyCount the data key count + * @param dataKeyFields the fields in underlying data + * @param dataValueFields the fields in underlying data + * @return the index mapping + */ + @Nullable + public static int[] createIndexMapping( + int[] tableProjection, + int tableKeyCount, + List<DataField> tableKeyFields, + List<DataField> tableValueFields, + int[] dataProjection, + int dataKeyCount, + List<DataField> dataKeyFields, + List<DataField> dataValueFields) { + List<Integer> tableKeyFieldIdList = + tableKeyFields.stream().map(DataField::id).collect(Collectors.toList()); + List<Integer> dataKeyFieldIdList = + dataKeyFields.stream().map(DataField::id).collect(Collectors.toList()); + int[] indexMapping = new int[tableProjection.length]; + + int[] dataKeyProjection = Arrays.copyOf(dataProjection, dataKeyCount); + for (int i = 0; i < tableKeyCount; i++) { + int fieldId = tableKeyFieldIdList.get(tableProjection[i]); + int dataFieldIndex = dataKeyFieldIdList.indexOf(fieldId); + indexMapping[i] = Ints.indexOf(dataKeyProjection, dataFieldIndex); + } + if (tableProjection.length >= tableKeyCount + 2) { + // seq and value kind + for (int i = tableKeyCount; i < tableKeyCount + 2; i++) { + indexMapping[i] = i + dataKeyCount - tableKeyCount; + } + + int[] dataValueProjection = + Arrays.copyOfRange(dataProjection, dataKeyCount + 2, dataProjection.length); + for (int i = 0; i < dataValueProjection.length; i++) { + dataValueProjection[i] = dataValueProjection[i] - dataKeyFields.size() - 2; + } + List<Integer> tableValueFieldIdList = + tableValueFields.stream().map(DataField::id).collect(Collectors.toList()); + List<Integer> dataValueFieldIdList = + dataValueFields.stream().map(DataField::id).collect(Collectors.toList()); + for (int i = tableKeyCount + 2; i < tableProjection.length; i++) { + int fieldId = + tableValueFieldIdList.get(tableProjection[i] - tableKeyFields.size() - 2); + int dataFieldIndex = dataValueFieldIdList.indexOf(fieldId); + int dataValueIndex = Ints.indexOf(dataValueProjection, dataFieldIndex); + indexMapping[i] = + dataValueIndex < 0 ? dataValueIndex : dataValueIndex + dataKeyCount + 2; + } + } + + for (int i = 0; i < indexMapping.length; i++) { + if (indexMapping[i] != i) { + return indexMapping; + } + } + return null; + } + + /** + * Create data projection from table projection. + * + * @param tableFields the fields of table + * @param dataFields the fields of underlying data + * @param tableProjection the projection of table + * @return the projection of data + */ + public static int[][] createDataProjection( + List<DataField> tableFields, List<DataField> dataFields, int[][] tableProjection) { + List<Integer> tableFieldIdList = + tableFields.stream().map(DataField::id).collect(Collectors.toList()); + List<Integer> dataFieldIdList = + dataFields.stream().map(DataField::id).collect(Collectors.toList()); + return Arrays.stream(tableProjection) + .map(p -> Arrays.copyOf(p, p.length)) + .peek( + p -> { + int fieldId = tableFieldIdList.get(p[0]); + p[0] = dataFieldIdList.indexOf(fieldId); + }) + .filter(p -> p[0] >= 0) Review Comment: I may have confused it with `createIndexMapping` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org