Hi Luke, This issue has been fixed in FLINK-25645 [1]. You can try 1.16 and later versions of flink.
[1] https://issues.apache.org/jira/browse/FLINK-25645 Best, Shammon FY On Tue, May 2, 2023 at 12:18 PM Luke Xiong <leix...@gmail.com> wrote: > Many thanks if anybody could help. I ran into this in version 1.15. > > I have a DataStream of a custom type *X* which has a field *foo* of type > *Map<String, > Long>*. I need to query this DataStream to find elements that meet the > conditions like *foo['bar'] == value*. > > The table is created with > *tenv.createTemporaryView(tableName, tenv.fromDataStream(XStream, schema))* > then > *tenv.toChangelogStream(tenv.sqlQuery(sql))* > or > *tenv.toDataStream(tenv.sqlQuery(sql))* > > My code works in streaming mode, but while in batch mode, I got the > following error: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Unsupported type(MAP<STRING, BIGINT>) to generate > hash code, the type(MAP<STRING, BIGINT>) is not supported as a > GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156) > Caused by: java.lang.UnsupportedOperationException: Unsupported > type(MAP<STRING, BIGINT>) to generate hash code, the type(MAP<STRING, > BIGINT>) is not supported as a GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field. > at > org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:317) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:102) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) > at > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateCodeBody(HashCodeGenerator.scala:95) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateRowHash(HashCodeGenerator.scala:60) > at > org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:323) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:102) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) > at > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateCodeBody(HashCodeGenerator.scala:95) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateRowHash(HashCodeGenerator.scala:60) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator.generateRowHash(HashCodeGenerator.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange.createHashPartitioner(BatchExecExchange.java:211) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.java:160) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate.translateToPlanInternal(BatchExecHashAggregate.java:94) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:65) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) > at > org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:86) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:218) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:245) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:218) > ... > > - Luke >