[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173959#comment-16173959 ]
Kent Murra commented on FLINK-7657: ----------------------------------- I've noticed that the scope of the problem is a bit larger than what I found. As an example, integer values are BigDecimal, and when running the test cases I'll see Literals that have a BigDecimal value and a BasicTypeInfo.INT type. It tends to work since the underlying engine considers Integers and BigDecimal comparable, however, my concern is that it does not match the documentation. I can see people implementing FilterableTableSource being surprised by this. At the same time, fixing the issue might break people who depend on the old behavior. I think for testing purposes, the ideal thing is to have the literal also encode the type so that we can do type verification as well in the test cases. However the scope of such a change is very large, and I'm not sure the maintainers would want to review such a patch. I can limit the scope of my change but my concern is that it won't be a *good* fix and more of a duct-tape thing. Can I get some guidance on the approach I should take? Let me know if there are any questions surrounding this. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > ------------------------------------------------------------------------------ > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.3.1, 1.3.2 > Reporter: Kent Murra > Assignee: Kent Murra > Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at > org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35) > at > org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92) > at > org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92) > at > org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211) > ... 19 more > {code} > I've done quite a bit of debugging on this and tracked it down to a problem > with the way a Calcite AST is translated into an Expression tree for the > predicates. Calcite parses timestamps as Calendar values, and you'll note in > [RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160] > that a Calendar value is being passed as-is to the > [Literal|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54] > which does no conversion of the value. The Literal, in turn, [expects the > value to be a java.sql.Date > subclass|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106], > which is where the exception arises. > I've done some informal testing of a bugfix where I convert the calendars to > java.sql.Date/java.sql.Time/java.sql.Timestamp in > RegNodeToExpressionConverter and had good results. Here is some reproduction > code in Scala. I am using Flink version 1.3.2 and running it in local mode > (Right-click + Run-as in IntelliJ). > {code:none} > package kmurra > import java.sql.Date > import java.util > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, > TypeInformation} > import org.apache.flink.api.java > import org.apache.flink.api.java.DataSet > import org.apache.flink.api.java.typeutils.RowTypeInfo > import org.apache.flink.api.scala.ExecutionEnvironment > import org.apache.flink.table.api.TableEnvironment > import org.apache.flink.table.api.scala.BatchTableEnvironment > import org.apache.flink.table.expressions.Expression > import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase} > import org.apache.flink.table.sources.{BatchTableSource, > FilterableTableSource, TableSource} > import org.apache.flink.types.Row > import scala.collection.mutable.ListBuffer > import scala.collection.JavaConversions._ > object TestReproductionApp extends App { > val tables: BatchTableEnvironment = > TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment) > val source = new TestTableSource > val sink = new PrintTableSink() > tables.registerTableSource("test_table", source) > tables.sql("SELECT * FROM test_table WHERE last_updated > DATE > '2017-05-01'").writeToSink(sink) > } > class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] { > def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print() > def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, > getFieldNames) > protected def copy: TableSinkBase[Row] = new PrintTableSink() > } > class TestTableSource(val isFilterPushedDown: Boolean = false) extends > BatchTableSource[Row] with FilterableTableSource[Row] { > val getReturnType: RowTypeInfo = { > val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > SqlTimeTypeInfo.DATE) > val fieldNames = Array("data", "last_updated") > new RowTypeInfo(typeInfo, fieldNames) > } > def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = > new TestTableSource(true) > def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = { > execEnv.fromCollection({ > val data = ListBuffer[Row]() > data += row("Success!", Date.valueOf("2017-09-01")) > data += row("Failure!", Date.valueOf("2017-01-01")) > data > }) > } > def row(data: String, lastUpdated: Date): Row = { > val row = new Row(2) > row.setField(0, data) > row.setField(1, lastUpdated) > row > } > } > {code} > Build system is SBT > {code:none} > name := "kmurra-flink-reproduction" > organization := "kmurra" > version := "1.0" > scalaVersion := "2.11.8" > resolvers ++= Seq("Apache Development Snapshot Repository" at > "https://repository.apache.org/content/repositories/snapshots/", > Resolver.mavenLocal) > val flinkVersion = "1.3.2" > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-scala" % flinkVersion,// % > "provided", > "org.apache.flink" %% "flink-table" % flinkVersion,// % > "provided", > "org.apache.flink" %% "flink-avro" % flinkVersion,// % > "provided", > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,// % > "provided", > "org.apache.flink" % "flink-jdbc" % flinkVersion,// % > "provided" > ) > assemblyMergeStrategy in assembly := { > case PathList("META-INF", xs @ _*) => MergeStrategy.discard > case x => MergeStrategy.first > } > // exclude Scala library from assembly > assemblyOption in assembly := (assemblyOption in > assembly).value.copy(includeScala = false) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)