[ https://issues.apache.org/jira/browse/FLINK-5348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15767435#comment-15767435 ]
ASF GitHub Bot commented on FLINK-5348: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93465385 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java --- @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()])); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String[fieldNames.size()]; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { --- End diff -- `TupleTypeBase.getFieldAt()` does check for index bounds as well and throws an `IndexOutOfBoundsException`. So we could simplify this a bit here. > Support custom field names for RowTypeInfo > ------------------------------------------ > > Key: FLINK-5348 > URL: https://issues.apache.org/jira/browse/FLINK-5348 > Project: Flink > Issue Type: Improvement > Components: Core > Reporter: Jark Wu > Assignee: Jark Wu > > Currently, the RowTypeInfo doesn't support optional custom field names, but > forced to generate {{f0}} ~ {{fn}} as field names. It would be better to > support custom names and will benefit some cases (e.g. FLINK-5280). -- This message was sent by Atlassian JIRA (v6.3.4#6332)