[ 
https://issues.apache.org/jira/browse/FLINK-33200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773466#comment-17773466
 ] 

Andriy Onyshchuk commented on FLINK-33200:
------------------------------------------

Hello, team. Here is a bit of the summary to this bug
h3. Given 
​
Working on projection over dataset that contains data represented as array of 
maps of
string to string `array[map[string, string]]` several issues was identified.
h3. Data Schema

{{DataTypes.ROW(}}
{{   DataTypes.FIELD("id", DataTypes.STRING()),}}
{{   DataTypes.FIELD("events", 
DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())))}}
{{{}){}}}​
h3. Issues
h4. Issue 1. Extraction of map values by index and key. 
 
{{inTable.select(}}
{{  Expressions.$("events").at(1).at("eventType").as("firstEventType")}}
{{).execute().print();}}
 
 
results in
 
{{Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Map entry access needs a valid key of type 'StringData', found 'String'.}}{{ at 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)}}{{
 at 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)}}{{
 at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:284)}}{{
 at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:164)}}{{
 at java.base/java.util.Optional.orElseGet(Optional.java:369)}}{{ at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:164)}}{{
 at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:98)}}{{
 at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)}}{{
 at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:97)}}{{
 at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:147)}}{{
 at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:98)}}{{
 at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)}}{{
 at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:97)}}{{
 at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:92)}}{{
 at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)}}{{
 at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)}}{{
 at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)}}{{
 at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)}}{{
 at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)}}{{
 at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)}}{{
 at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)}}{{
 at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:93)}}{{
 at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$concatenateRules$2(ExpressionResolver.java:247)}}{{
 at 
java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)}}{{ at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:210)}}{{
 at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:199)}}{{
 at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:174)}}{{
 at 
org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:142)}}{{ at 
IssueDemo.main(IssueDemo.java:73)}}
h4. Issue 2. Extraction of entire map by index.

{{inTable.select(}}
{{  Expressions.$("events").at(1).as("firstEvent")}}
{{).execute().print();}}
​
results in
 
{{{}Query schema: [firstEvent: MAP<STRING, STRING>]{}}}{{{}Sink schema: 
[firstEvent: MAP<RAW('org.apache.flink.table.data.StringData', ?), 
RAW('org.apache.flink.table.data.StringData', ?)>]{}}}{{ at 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:1005)}}{{
 at 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:349)}}{{
 at 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:307)}}{{
 at 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:268)}}{{
 at 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertCollectToRel(DynamicSinkUtils.java:154)}}{{
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:230)}}{{
 at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)}}{{
 at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)}}{{ 
at scala.collection.Iterator.foreach(Iterator.scala:937)}}{{ at 
scala.collection.Iterator.foreach$(Iterator.scala:937)}}{{ at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425)}}{{ at 
scala.collection.IterableLike.foreach(IterableLike.scala:70)}}{{ at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69)}}{{ at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{ at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)}}{{ at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)}}{{ at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)}}{{ at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)}}{{
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1805)}}{{
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:947)}}{{
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1424)}}{{
 at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)}}{{ 
at IssueDemo.main(IssueDemo.java:80)}}
​
h3. Issue Reproduction
Full `IssueDemo.java` source code
```
{{import lombok.AllArgsConstructor;}}
{{import org.apache.flink.api.common.typeinfo.TypeInformation;}}
{{import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;}}
{{import org.apache.flink.table.api.DataTypes;}}
{{import org.apache.flink.table.api.Expressions;}}
{{{}import org.apache.flink.table.api.Schema;{}}}{{{}import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;{}}}
{{import org.apache.flink.table.data.*;}}
{{import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;}}
{{{}import org.apache.flink.table.types.DataType;{}}}{{{}​{}}}
{{import java.util.List;}}
{{import java.util.Map;}}
{{import java.util.stream.Collectors;}}
{{{}​{}}}{{{}public class IssueDemo {{}}}{{{}​{}}}
{{  @AllArgsConstructor}}{{ public static class DataEnvelope {}}
{{    public final String id;}}{{ }}
{{{}    public final List<Map<String, String>> events;{}}}{{{}​{}}}{{ }}
{{    public RowData toRowData() {}}{{ }}
{{      final var row = new GenericRowData(2);}}{{ }}
{{      row.setField(0, StringData.fromString(id));}}{{ }}
{{      row.setField(1, eventsMapData());}}{{ }}
{{      return row;}}
{{{}    }{}}}{{{}​{}}}
{{    private ArrayData eventsMapData() {}}
{{      return new GenericArrayData(events.stream()}}
{{        .map(event ->}}{{ new GenericMapData(event
}}{{          .entrySet()}}
{{          .stream()}}
{{          .collect(Collectors.toMap(}}
{{            e -> StringData.fromString(e.getKey()),}}
{{            e -> StringData.fromString(e.getValue())}}
{{        ))))}}
{{        .toArray(Object[]::new)}}
{{      );}}
{{{}    }{}}}{{{}​{}}}
{{    public static final DataType DATA_TYPE = DataTypes.ROW(}}
{{      DataTypes.FIELD("id", DataTypes.STRING()),}}
{{      DataTypes.FIELD("events", 
DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())))}}
{{{}    );{}}}{{{}​{}}}
{{{}    public static final TypeInformation<RowData> TYPE_INFO = 
InternalTypeInfo.of(DATA_TYPE.getLogicalType());{}}}{{{}​{}}}
{{    public static final Schema SCHEMA = 
Schema.newBuilder().fromRowDataType(DATA_TYPE).build();}}
{{{}  }{}}}{{{}​{}}}
{{  public static final List<DataEnvelope> TEST_DATA = List.of(}}
{{    new DataEnvelope("1", List.of(Map.of("eventType", "enter"))),}}
{{    new DataEnvelope("2", List.of(Map.of("eventType", "dialog"), 
Map.of("eventType", "exit")))}}
{{{}  );{}}}{{{}​{}}}
{{  public static void main(String[] args) throws Exception {}}
{{    final var env = StreamExecutionEnvironment.getExecutionEnvironment();}}
{{{}    final var tenv = StreamTableEnvironment.create(env);{}}}{{{}​{}}}
{{    final var inRows = 
TEST_DATA.stream().map(DataEnvelope::toRowData).collect(Collectors.toList());}}
{{    final var inStream = env.fromCollection(inRows, DataEnvelope.TYPE_INFO);}}
{{    final var inTable = tenv.fromDataStream(inStream, DataEnvelope.SCHEMA);}}
{{    // print test data}}
{{{}    ​{}}}{{{}inTable.execute().print();{}}}{{{}​{}}}{{ }}
{{    // issue #1
}}{{    inTable.select(}}{{ }}
{{      Expressions.$("events").at(1).at("eventType").as("firstEventType")}}
{{{}    ).execute().print();{}}}{{{}​{}}}
{{    // issue #2
}}{{    inTable.select(}}
{{      Expressions.$("events").at(1).as("firstEvent")}}
{{{}    ).execute().print();{}}}{{{}​{}}}
{{    env.close();}}
{{  }}}
{{{}}{}}}​
h3. Test

I also tried to work out a test, but might need some guidance from people with 
better knowledge of the subject.
​
One of the obvious places to test it (from novice perspective) is 
`ExpressionResolverTest` class.
This route didn't work though. The reason is that `FunctionLookup` `Mock` is 
pretty limited around 
supported function definitions. Adding `PlanerExpression`, which is used in 
production code
(see 
`org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.ResolvingCallVisitor#runLegacyTypeInference`),
to the picture doesn't really work as it introduces circular dependency between 
(`flink-table-api-java` and `flink-table-plannner_2.12`).​
h3. Diagnosis
​
`AT` function definition does have `MISSING` output type stragegy, which, 
according to code in `ResolveCallByArgumentsRule.getOptionalTypeInference` makes
flow fall to `ResolveCallByArgumentsRule.runLegacyTypeInference`. So first 
question is (again, from novice point of view) - do we really want to fall to
legacy procedures here. The legacy procedure, which is mainly specified as 
`PlannerTypeInferenceUtilImpl -> TypeConversions.fromLegacyInfoToDataType -> 
LegacyTypeInfoDataTypeConverter.toDataType`
chain call seems to having no ideas about `StringData` type, which leads to 
usage of legacy wrapper for `StringData` here 
`org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter#createLegacyType`.
So the second question is: if the legacy logical branch here is intentional, 
should we add proper support for `StringData` there?

> ItemAt Expression validation fail in Table API due to type mismatch
> -------------------------------------------------------------------
>
>                 Key: FLINK-33200
>                 URL: https://issues.apache.org/jira/browse/FLINK-33200
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.18.0, 1.17.1, 1.18.1
>            Reporter: Zhenqiu Huang
>            Priority: Minor
>             Fix For: 1.8.4
>
>
> The table schema is defined as below:
> public static final DataType DATA_TYPE = DataTypes.ROW(
>             DataTypes.FIELD("id", DataTypes.STRING()),
>             DataTypes.FIELD("events", 
> DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())))
>         );
> public static final Schema SCHEMA = 
> Schema.newBuilder().fromRowDataType(DATA_TYPE).build();
> inTable.select(Expressions.$("events").at(1).at("eventType").as("firstEventType")
> The validation fail as "eventType" is inferred as 
> BasicTypeInfo.STRING_TYPE_INFO, the table key internally is a 
> StringDataTypeInfo. The validation fail at 
> case mti: MapTypeInfo[_, _] =>
>         if (key.resultType == mti.getKeyTypeInfo) {
>           ValidationSuccess
>         } else {
>           ValidationFailure(
>             s"Map entry access needs a valid key of type " +
>               s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
>         }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to