[ 
https://issues.apache.org/jira/browse/FLINK-4542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512187#comment-16512187
 ] 

Sergey Nuyanzin commented on FLINK-4542:
----------------------------------------

Hello [~twalthr]
could you please clarify some questions here. I have already faced an issue 
that most of the mentioned operations are not supported in Calcite => I made it 
support within CALCITE-2355 (I guess it will be available since 1.17.0).
At the same time at least two operations available now: cardinality, element 
e.g. such test passes e.g. 
https://github.com/apache/flink/compare/master...snuyanzin:FLINK_4542 

# Am I right that multisets are not supported in Flink SQL and it would be nice 
to have it support (look at the example below)? I have not find any related 
ticket for this in jira.
# As I understand Multiset in Flink extends from Map at least it works for 
MapTypeInfo and MultisetTypeInfo. What about support this operations? Calcite 
does not allow them for maps and SQL:2003 also says nothing about about. On the 
other hand at least cardinality, element could work for maps
# Am I right that currently 2 functions + multisets could be implemented within 
this ticket and all other stuff could be moved under FLINK-9134 as depending on 
Calcite update because of CALCITE-2355

below there are some details about issue relating to multisets in Flink
I was able to add it into Table API however faced another issue with SQL: there 
is no support of multisets in Flink e.g. simple query {code}select 
multiset[1]{code} or {code}@Test
  def testUnboundedElement1(): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    env.setStateBackend(getStateBackend)
    StreamITCase.clear

    val sqlQuery = "SELECT MULTISET[1] FROM MyTable"

    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
    tEnv.registerTable("MyTable", t)

    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
    result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
    env.execute()

    val expected = List(
      "1,1", "10,1", "11,1", "12,1", "13,1", "14,1", "15,1", "16,1", "17,1", 
"18,1", "19,1",
      "2,1", "20,1", "21,1", "3,1", "4,1", "5,1", "6,1", "7,1", "8,1", "9,1")
    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
  }{code} fails like {noformat}org.apache.flink.table.api.ValidationException: 
SQL validation failed. null

        at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
        at 
org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:610)
        at 
org.apache.flink.table.runtime.stream.sql.SqlITCase.testUnboundedElement1(SqlITCase.scala:303)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
        at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
        at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException
        at 
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:212)
        at org.apache.calcite.sql.SqlBasicCall.setOperator(SqlBasicCall.java:68)
        at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:508)
        at 
org.apache.calcite.sql.validate.CollectNamespace.validateImpl(CollectNamespace.java:68)
        at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:947)
        at 
org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
        at 
org.apache.calcite.sql.validate.CollectNamespace.getRowType(CollectNamespace.java:39)
        at 
org.apache.calcite.sql.validate.AbstractNamespace.getType(AbstractNamespace.java:126)
        at 
org.apache.calcite.sql.validate.CollectNamespace.getType(CollectNamespace.java:39)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1589)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:460)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3969)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3232)
        at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:947)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:928)
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:903)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:613)
        at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89)
        ... 30 more
{noformat}



> Add MULTISET operations
> -----------------------
>
>                 Key: FLINK-4542
>                 URL: https://issues.apache.org/jira/browse/FLINK-4542
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Sergey Nuyanzin
>            Priority: Minor
>
> Umbrella issue for MULTISET operations like:
> MULTISET UNION, MULTISET UNION ALL, MULTISET EXCEPT, MULTISET EXCEPT ALL, 
> MULTISET INTERSECT, MULTISET INTERSECT ALL, CARDINALITY, ELEMENT, MEMBER OF, 
> SUBMULTISET OF, IS A SET, FUSION
> At the moment we only support COLLECT.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to