[ 
https://issues.apache.org/jira/browse/FLINK-5348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15767430#comment-15767430
 ] 

ASF GitHub Bot commented on FLINK-5348:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3020#discussion_r93405238
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
    @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
                }
        }
     
    +   public RowTypeInfo(List<TypeInformation<?>> types, List<String> 
fieldNames) {
    +           super(Row.class, types == null ? null : types.toArray(new 
TypeInformation[types.size()]));
    +           checkNotNull(fieldNames, "FieldNames should not be null.");
    +           checkArgument(
    +                   types.size() == fieldNames.size(),
    +                   "Number of field types and names is different.");
    +           checkArgument(
    +                   types.size() == new HashSet<>(fieldNames).size(),
    +                   "Field names are not unique.");
    +
    +           this.fieldNames = new String[fieldNames.size()];
    +
    +           for (int i = 0; i < fieldNames.size(); i++) {
    +                   this.fieldNames[i] = fieldNames.get(i);
    +           }
    +   }
    +
    +   @Override
    +   public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
    +           Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
    +
    +           if (!matcher.matches()) {
    +                   throw new InvalidFieldReferenceException(
    +                           "Invalid tuple field reference \"" + 
fieldExpression + "\".");
    +           }
    +
    +           String field = matcher.group(0);
    +
    +           if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
    +                   (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
    +                   // handle select all
    +                   int keyPosition = 0;
    +                   for (TypeInformation<?> fType : types) {
    +                           if (fType instanceof CompositeType) {
    +                                   CompositeType<?> cType = 
(CompositeType<?>) fType;
    +                                   
cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, 
result);
    +                                   keyPosition += cType.getTotalFields() - 
1;
    +                           } else {
    +                                   result.add(new 
FlatFieldDescriptor(offset + keyPosition, fType));
    +                           }
    +                           keyPosition++;
    +                   }
    +           } else {
    +                   field = matcher.group(1);
    +
    +                   Matcher intFieldMatcher = 
PATTERN_INT_FIELD.matcher(field);
    +                   TypeInformation<?> fieldType = null;
    +                   if (intFieldMatcher.matches()) {
    +                           // field expression is an integer
    +                           int fieldIndex = Integer.valueOf(field);
    +                           if (fieldIndex > this.getArity()) {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Row field expression \"" + 
field + "\" out of bounds of " + this.toString() + ".");
    +                           }
    +                           for (int i = 0; i < fieldIndex; i++) {
    +                                   offset += 
this.getTypeAt(i).getTotalFields();
    +                           }
    +                           fieldType = this.getTypeAt(fieldIndex);
    +                   } else {
    +                           for (int i = 0; i < this.fieldNames.length; 
i++) {
    +                                   if (fieldNames[i].equals(field)) {
    +                                           // found field
    +                                           fieldType = this.getTypeAt(i);
    +                                           break;
    +                                   }
    +                                   offset += 
this.getTypeAt(i).getTotalFields();
    +                           }
    +                           if (fieldType == null) {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Unable to find field \"" + 
field + "\" in type " + this.toString() + ".");
    +                           }
    +                   }
    +
    +                   String tail = matcher.group(3);
    +
    +                   if (tail == null) {
    +                           // expression hasn't nested field
    +                           if (fieldType instanceof CompositeType) {
    +                                   ((CompositeType) 
fieldType).getFlatFields("*", offset, result);
    +                           } else {
    +                                   result.add(new 
FlatFieldDescriptor(offset, fieldType));
    +                           }
    +                   } else {
    +                           // expression has nested field
    +                           if (fieldType instanceof CompositeType) {
    +                                   ((CompositeType) 
fieldType).getFlatFields(tail, offset, result);
    +                           } else {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Nested field expression \"" + 
tail + "\" not possible on atomic type " + fieldType + ".");
    +                           }
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
    --- End diff --
    
    This is the same logic as in `CaseClassTypeInfo.getTypeAt()` only ported to 
Java, right?


> Support custom field names for RowTypeInfo
> ------------------------------------------
>
>                 Key: FLINK-5348
>                 URL: https://issues.apache.org/jira/browse/FLINK-5348
>             Project: Flink
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>
> Currently, the RowTypeInfo doesn't support optional custom field names, but 
> forced to generate {{f0}} ~ {{fn}} as field names. It would be better to 
> support custom names and will benefit some cases (e.g. FLINK-5280).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to