[ https://issues.apache.org/jira/browse/FLINK-7426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207172#comment-16207172 ]
ASF GitHub Bot commented on FLINK-7426: --------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4732 > Table API does not support null values in keys > ---------------------------------------------- > > Key: FLINK-7426 > URL: https://issues.apache.org/jira/browse/FLINK-7426 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.3.2 > Reporter: Timo Walther > Assignee: Timo Walther > Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > The Table API uses {{keyBy}} internally, however, the generated > {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not > able to serialize null values. This causes issues during checkpointing or > when using the RocksDB state backend. We need to replace all {{keyBy}} calls > with a custom {{RowKeySelector}}. > {code} > class AggregateITCase extends StreamingWithStateTestBase { > private val queryConfig = new StreamQueryConfig() > queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) > @Test > def testDistinct(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStateBackend(getStateBackend) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.clear > val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) > .select('b, Null(Types.LONG)).distinct() > val results = t.toRetractStream[Row](queryConfig) > results.addSink(new StreamITCase.RetractingSink).setParallelism(1) > env.execute() > val expected = mutable.MutableList("1,null", "2,null", "3,null", > "4,null", "5,null", "6,null") > assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)