[ https://issues.apache.org/jira/browse/FLINK-5348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15767430#comment-15767430 ]
ASF GitHub Bot commented on FLINK-5348: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93405238 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java --- @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()])); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String[fieldNames.size()]; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + } + for (int i = 0; i < fieldIndex; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + fieldType = this.getTypeAt(fieldIndex); + } else { + for (int i = 0; i < this.fieldNames.length; i++) { + if (fieldNames[i].equals(field)) { + // found field + fieldType = this.getTypeAt(i); + break; + } + offset += this.getTypeAt(i).getTotalFields(); + } + if (fieldType == null) { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + } + } + + String tail = matcher.group(3); + + if (tail == null) { + // expression hasn't nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + } else { + result.add(new FlatFieldDescriptor(offset, fieldType)); + } + } else { + // expression has nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { --- End diff -- This is the same logic as in `CaseClassTypeInfo.getTypeAt()` only ported to Java, right? > Support custom field names for RowTypeInfo > ------------------------------------------ > > Key: FLINK-5348 > URL: https://issues.apache.org/jira/browse/FLINK-5348 > Project: Flink > Issue Type: Improvement > Components: Core > Reporter: Jark Wu > Assignee: Jark Wu > > Currently, the RowTypeInfo doesn't support optional custom field names, but > forced to generate {{f0}} ~ {{fn}} as field names. It would be better to > support custom names and will benefit some cases (e.g. FLINK-5280). -- This message was sent by Atlassian JIRA (v6.3.4#6332)