[ https://issues.apache.org/jira/browse/FLINK-33200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773466#comment-17773466 ]
Andriy Onyshchuk commented on FLINK-33200: ------------------------------------------ Hello, team. Here is a bit of the summary to this bug h3. Given Working on projection over dataset that contains data represented as array of maps of string to string `array[map[string, string]]` several issues was identified. h3. Data Schema {{DataTypes.ROW(}} {{ DataTypes.FIELD("id", DataTypes.STRING()),}} {{ DataTypes.FIELD("events", DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())))}} {{{}){}}} h3. Issues h4. Issue 1. Extraction of map values by index and key. {{inTable.select(}} {{ Expressions.$("events").at(1).at("eventType").as("firstEventType")}} {{).execute().print();}} results in {{Exception in thread "main" org.apache.flink.table.api.ValidationException: Map entry access needs a valid key of type 'StringData', found 'String'.}}{{ at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)}}{{ at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)}}{{ at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:284)}}{{ at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:164)}}{{ at java.base/java.util.Optional.orElseGet(Optional.java:369)}}{{ at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:164)}}{{ at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:98)}}{{ at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)}}{{ at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:97)}}{{ at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:147)}}{{ at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:98)}}{{ at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)}}{{ at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:97)}}{{ at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:92)}}{{ at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)}}{{ at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)}}{{ at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)}}{{ at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)}}{{ at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)}}{{ at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)}}{{ at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)}}{{ at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:93)}}{{ at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$concatenateRules$2(ExpressionResolver.java:247)}}{{ at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)}}{{ at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:210)}}{{ at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:199)}}{{ at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:174)}}{{ at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:142)}}{{ at IssueDemo.main(IssueDemo.java:73)}} h4. Issue 2. Extraction of entire map by index. {{inTable.select(}} {{ Expressions.$("events").at(1).as("firstEvent")}} {{).execute().print();}} results in {{{}Query schema: [firstEvent: MAP<STRING, STRING>]{}}}{{{}Sink schema: [firstEvent: MAP<RAW('org.apache.flink.table.data.StringData', ?), RAW('org.apache.flink.table.data.StringData', ?)>]{}}}{{ at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:1005)}}{{ at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:349)}}{{ at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:307)}}{{ at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:268)}}{{ at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertCollectToRel(DynamicSinkUtils.java:154)}}{{ at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:230)}}{{ at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)}}{{ at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)}}{{ at scala.collection.Iterator.foreach(Iterator.scala:937)}}{{ at scala.collection.Iterator.foreach$(Iterator.scala:937)}}{{ at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)}}{{ at scala.collection.IterableLike.foreach(IterableLike.scala:70)}}{{ at scala.collection.IterableLike.foreach$(IterableLike.scala:69)}}{{ at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{ at scala.collection.TraversableLike.map(TraversableLike.scala:233)}}{{ at scala.collection.TraversableLike.map$(TraversableLike.scala:226)}}{{ at scala.collection.AbstractTraversable.map(Traversable.scala:104)}}{{ at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)}}{{ at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1805)}}{{ at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:947)}}{{ at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1424)}}{{ at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)}}{{ at IssueDemo.main(IssueDemo.java:80)}} h3. Issue Reproduction Full `IssueDemo.java` source code ``` {{import lombok.AllArgsConstructor;}} {{import org.apache.flink.api.common.typeinfo.TypeInformation;}} {{import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;}} {{import org.apache.flink.table.api.DataTypes;}} {{import org.apache.flink.table.api.Expressions;}} {{{}import org.apache.flink.table.api.Schema;{}}}{{{}import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;{}}} {{import org.apache.flink.table.data.*;}} {{import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;}} {{{}import org.apache.flink.table.types.DataType;{}}}{{{}{}}} {{import java.util.List;}} {{import java.util.Map;}} {{import java.util.stream.Collectors;}} {{{}{}}}{{{}public class IssueDemo {{}}}{{{}{}}} {{ @AllArgsConstructor}}{{ public static class DataEnvelope {}} {{ public final String id;}}{{ }} {{{} public final List<Map<String, String>> events;{}}}{{{}{}}}{{ }} {{ public RowData toRowData() {}}{{ }} {{ final var row = new GenericRowData(2);}}{{ }} {{ row.setField(0, StringData.fromString(id));}}{{ }} {{ row.setField(1, eventsMapData());}}{{ }} {{ return row;}} {{{} }{}}}{{{}{}}} {{ private ArrayData eventsMapData() {}} {{ return new GenericArrayData(events.stream()}} {{ .map(event ->}}{{ new GenericMapData(event }}{{ .entrySet()}} {{ .stream()}} {{ .collect(Collectors.toMap(}} {{ e -> StringData.fromString(e.getKey()),}} {{ e -> StringData.fromString(e.getValue())}} {{ ))))}} {{ .toArray(Object[]::new)}} {{ );}} {{{} }{}}}{{{}{}}} {{ public static final DataType DATA_TYPE = DataTypes.ROW(}} {{ DataTypes.FIELD("id", DataTypes.STRING()),}} {{ DataTypes.FIELD("events", DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())))}} {{{} );{}}}{{{}{}}} {{{} public static final TypeInformation<RowData> TYPE_INFO = InternalTypeInfo.of(DATA_TYPE.getLogicalType());{}}}{{{}{}}} {{ public static final Schema SCHEMA = Schema.newBuilder().fromRowDataType(DATA_TYPE).build();}} {{{} }{}}}{{{}{}}} {{ public static final List<DataEnvelope> TEST_DATA = List.of(}} {{ new DataEnvelope("1", List.of(Map.of("eventType", "enter"))),}} {{ new DataEnvelope("2", List.of(Map.of("eventType", "dialog"), Map.of("eventType", "exit")))}} {{{} );{}}}{{{}{}}} {{ public static void main(String[] args) throws Exception {}} {{ final var env = StreamExecutionEnvironment.getExecutionEnvironment();}} {{{} final var tenv = StreamTableEnvironment.create(env);{}}}{{{}{}}} {{ final var inRows = TEST_DATA.stream().map(DataEnvelope::toRowData).collect(Collectors.toList());}} {{ final var inStream = env.fromCollection(inRows, DataEnvelope.TYPE_INFO);}} {{ final var inTable = tenv.fromDataStream(inStream, DataEnvelope.SCHEMA);}} {{ // print test data}} {{{} {}}}{{{}inTable.execute().print();{}}}{{{}{}}}{{ }} {{ // issue #1 }}{{ inTable.select(}}{{ }} {{ Expressions.$("events").at(1).at("eventType").as("firstEventType")}} {{{} ).execute().print();{}}}{{{}{}}} {{ // issue #2 }}{{ inTable.select(}} {{ Expressions.$("events").at(1).as("firstEvent")}} {{{} ).execute().print();{}}}{{{}{}}} {{ env.close();}} {{ }}} {{{}}{}}} h3. Test I also tried to work out a test, but might need some guidance from people with better knowledge of the subject. One of the obvious places to test it (from novice perspective) is `ExpressionResolverTest` class. This route didn't work though. The reason is that `FunctionLookup` `Mock` is pretty limited around supported function definitions. Adding `PlanerExpression`, which is used in production code (see `org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.ResolvingCallVisitor#runLegacyTypeInference`), to the picture doesn't really work as it introduces circular dependency between (`flink-table-api-java` and `flink-table-plannner_2.12`). h3. Diagnosis `AT` function definition does have `MISSING` output type stragegy, which, according to code in `ResolveCallByArgumentsRule.getOptionalTypeInference` makes flow fall to `ResolveCallByArgumentsRule.runLegacyTypeInference`. So first question is (again, from novice point of view) - do we really want to fall to legacy procedures here. The legacy procedure, which is mainly specified as `PlannerTypeInferenceUtilImpl -> TypeConversions.fromLegacyInfoToDataType -> LegacyTypeInfoDataTypeConverter.toDataType` chain call seems to having no ideas about `StringData` type, which leads to usage of legacy wrapper for `StringData` here `org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter#createLegacyType`. So the second question is: if the legacy logical branch here is intentional, should we add proper support for `StringData` there? > ItemAt Expression validation fail in Table API due to type mismatch > ------------------------------------------------------------------- > > Key: FLINK-33200 > URL: https://issues.apache.org/jira/browse/FLINK-33200 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.18.0, 1.17.1, 1.18.1 > Reporter: Zhenqiu Huang > Priority: Minor > Fix For: 1.8.4 > > > The table schema is defined as below: > public static final DataType DATA_TYPE = DataTypes.ROW( > DataTypes.FIELD("id", DataTypes.STRING()), > DataTypes.FIELD("events", > DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))) > ); > public static final Schema SCHEMA = > Schema.newBuilder().fromRowDataType(DATA_TYPE).build(); > inTable.select(Expressions.$("events").at(1).at("eventType").as("firstEventType") > The validation fail as "eventType" is inferred as > BasicTypeInfo.STRING_TYPE_INFO, the table key internally is a > StringDataTypeInfo. The validation fail at > case mti: MapTypeInfo[_, _] => > if (key.resultType == mti.getKeyTypeInfo) { > ValidationSuccess > } else { > ValidationFailure( > s"Map entry access needs a valid key of type " + > s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.") > } -- This message was sent by Atlassian Jira (v8.20.10#820010)