[ https://issues.apache.org/jira/browse/FLINK-4542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512187#comment-16512187 ]
Sergey Nuyanzin commented on FLINK-4542: ---------------------------------------- Hello [~twalthr] could you please clarify some questions here. I have already faced an issue that most of the mentioned operations are not supported in Calcite => I made it support within CALCITE-2355 (I guess it will be available since 1.17.0). At the same time at least two operations available now: cardinality, element e.g. such test passes e.g. https://github.com/apache/flink/compare/master...snuyanzin:FLINK_4542 # Am I right that multisets are not supported in Flink SQL and it would be nice to have it support (look at the example below)? I have not find any related ticket for this in jira. # As I understand Multiset in Flink extends from Map at least it works for MapTypeInfo and MultisetTypeInfo. What about support this operations? Calcite does not allow them for maps and SQL:2003 also says nothing about about. On the other hand at least cardinality, element could work for maps # Am I right that currently 2 functions + multisets could be implemented within this ticket and all other stuff could be moved under FLINK-9134 as depending on Calcite update because of CALCITE-2355 below there are some details about issue relating to multisets in Flink I was able to add it into Table API however faced another issue with SQL: there is no support of multisets in Flink e.g. simple query {code}select multiset[1]{code} or {code}@Test def testUnboundedElement1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) env.setStateBackend(getStateBackend) StreamITCase.clear val sqlQuery = "SELECT MULTISET[1] FROM MyTable" val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row] result.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() val expected = List( "1,1", "10,1", "11,1", "12,1", "13,1", "14,1", "15,1", "16,1", "17,1", "18,1", "19,1", "2,1", "20,1", "21,1", "3,1", "4,1", "5,1", "6,1", "7,1", "8,1", "9,1") assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) }{code} fails like {noformat}org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:610) at org.apache.flink.table.runtime.stream.sql.SqlITCase.testUnboundedElement1(SqlITCase.scala:303) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.lang.NullPointerException at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:212) at org.apache.calcite.sql.SqlBasicCall.setOperator(SqlBasicCall.java:68) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:508) at org.apache.calcite.sql.validate.CollectNamespace.validateImpl(CollectNamespace.java:68) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:947) at org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) at org.apache.calcite.sql.validate.CollectNamespace.getRowType(CollectNamespace.java:39) at org.apache.calcite.sql.validate.AbstractNamespace.getType(AbstractNamespace.java:126) at org.apache.calcite.sql.validate.CollectNamespace.getType(CollectNamespace.java:39) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1589) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:460) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3969) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3232) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:947) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:928) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:903) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:613) at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89) ... 30 more {noformat} > Add MULTISET operations > ----------------------- > > Key: FLINK-4542 > URL: https://issues.apache.org/jira/browse/FLINK-4542 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Sergey Nuyanzin > Priority: Minor > > Umbrella issue for MULTISET operations like: > MULTISET UNION, MULTISET UNION ALL, MULTISET EXCEPT, MULTISET EXCEPT ALL, > MULTISET INTERSECT, MULTISET INTERSECT ALL, CARDINALITY, ELEMENT, MEMBER OF, > SUBMULTISET OF, IS A SET, FUSION > At the moment we only support COLLECT. -- This message was sent by Atlassian JIRA (v7.6.3#76005)