[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173959#comment-16173959
 ] 

Kent Murra edited comment on FLINK-7657 at 9/21/17 12:34 AM:
-------------------------------------------------------------

I've noticed that the scope of the problem is a bit larger than what I found.  
As an example, integer literals in a SQL statement values are parsed as 
BigDecimal and not converted when moved to a literal.  When running the test 
cases I'll see Literals that have a BigDecimal value and a BasicTypeInfo.INT 
type.  It tends to work since the underlying engine considers Integers and 
BigDecimal comparable, however, my concern is that it does not match the 
documentation.  I can see people implementing FilterableTableSource being 
surprised by this.  At the same time, fixing the issue might break people who 
depend on the old behavior.

I think for testing purposes, the ideal thing is to have the literal also 
encode the type so that we can do type verification as well in the test cases.  
However the scope of such a change is very large, and I'm not sure the 
maintainers would want to review such a patch.  I can limit the scope of my 
change but my concern is that it won't be a *good* fix and more of a duct-tape 
thing.

Can I get some guidance on whether I should expand the scope or just limit it 
to the time-based literals as originally reported?

Let me know if there are any questions surrounding this.


was (Author: kmurra):
I've noticed that the scope of the problem is a bit larger than what I found.  
As an example, integer literals in a SQL statement values are parsed as 
BigDecimal and not converted when moved to a literal.  When running the test 
cases I'll see Literals that have a BigDecimal value and a BasicTypeInfo.INT 
type.  It tends to work since the underlying engine considers Integers and 
BigDecimal comparable, however, my concern is that it does not match the 
documentation.  I can see people implementing FilterableTableSource being 
surprised by this.  At the same time, fixing the issue might break people who 
depend on the old behavior.

I think for testing purposes, the ideal thing is to have the literal also 
encode the type so that we can do type verification as well in the test cases.  
However the scope of such a change is very large, and I'm not sure the 
maintainers would want to review such a patch.  I can limit the scope of my 
change but my concern is that it won't be a *good* fix and more of a duct-tape 
thing.

Can I get some guidance on the approach I should take?  

Let me know if there are any questions surrounding this.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-7657
>                 URL: https://issues.apache.org/jira/browse/FLINK-7657
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.1, 1.3.2
>            Reporter: Kent Murra
>            Assignee: Kent Murra
>            Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>       at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>       at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>       at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>       at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>       at scala.App$class.main(App.scala:76)
>       at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>       at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>       at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>       at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>       at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>       at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
>       ... 19 more
> {code}
> I've done quite a bit of debugging on this and tracked it down to a problem 
> with the way a Calcite AST is translated into an Expression tree for the 
> predicates. Calcite parses timestamps as Calendar values, and you'll note in 
> [RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160]
>  that a Calendar value is being passed as-is to the 
> [Literal|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54]
>  which does no conversion of the value. The Literal, in turn, [expects the 
> value to be a java.sql.Date 
> subclass|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106],
>  which is where the exception arises.
> I've done some informal testing of a bugfix where I convert the calendars to 
> java.sql.Date/java.sql.Time/java.sql.Timestamp in 
> RegNodeToExpressionConverter and had good results. Here is some reproduction 
> code in Scala. I am using Flink version 1.3.2 and running it in local mode 
> (Right-click + Run-as in IntelliJ). 
> {code:none}
> package kmurra
> import java.sql.Date
> import java.util
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
> TypeInformation}
> import org.apache.flink.api.java
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.scala.ExecutionEnvironment
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala.BatchTableEnvironment
> import org.apache.flink.table.expressions.Expression
> import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
> import org.apache.flink.table.sources.{BatchTableSource, 
> FilterableTableSource, TableSource}
> import org.apache.flink.types.Row
> import scala.collection.mutable.ListBuffer
> import scala.collection.JavaConversions._
> object TestReproductionApp extends App {
>   val tables: BatchTableEnvironment = 
> TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
>   val source = new TestTableSource
>   val sink = new PrintTableSink()
>   tables.registerTableSource("test_table", source)
>   tables.sql("SELECT * FROM test_table WHERE last_updated > DATE 
> '2017-05-01'").writeToSink(sink)
> }
> class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
>   def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
>   def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, 
> getFieldNames)
>   protected def copy: TableSinkBase[Row] = new PrintTableSink()
> }
> class TestTableSource(val isFilterPushedDown: Boolean = false) extends 
> BatchTableSource[Row] with FilterableTableSource[Row] {
>   val getReturnType: RowTypeInfo = {
>     val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> SqlTimeTypeInfo.DATE)
>     val fieldNames = Array("data", "last_updated")
>     new RowTypeInfo(typeInfo, fieldNames)
>   }
>   def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = 
> new TestTableSource(true)
>   def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
>     execEnv.fromCollection({
>       val data = ListBuffer[Row]()
>       data += row("Success!", Date.valueOf("2017-09-01"))
>       data += row("Failure!", Date.valueOf("2017-01-01"))
>       data
>     })
>   }
>   def row(data: String, lastUpdated: Date): Row = {
>     val row = new Row(2)
>     row.setField(0, data)
>     row.setField(1, lastUpdated)
>     row
>   }
> }
> {code}
>  Build system is SBT 
> {code:none} 
> name := "kmurra-flink-reproduction"
> organization := "kmurra" 
> version := "1.0"
> scalaVersion := "2.11.8"
> resolvers ++= Seq("Apache Development Snapshot Repository" at 
> "https://repository.apache.org/content/repositories/snapshots/";, 
> Resolver.mavenLocal)
> val flinkVersion = "1.3.2"
> libraryDependencies ++= Seq(
>   "org.apache.flink"     %% "flink-scala"             % flinkVersion,//     % 
> "provided",
>   "org.apache.flink"     %% "flink-table"             % flinkVersion,//     % 
> "provided",
>   "org.apache.flink"     %% "flink-avro"              % flinkVersion,//    % 
> "provided",
>   "org.apache.flink"     %% "flink-streaming-scala"   % flinkVersion,//    % 
> "provided",
>   "org.apache.flink"     %  "flink-jdbc"              % flinkVersion,//     % 
> "provided"
> )
> assemblyMergeStrategy in assembly := {
>   case PathList("META-INF", xs @ _*) => MergeStrategy.discard
>   case x => MergeStrategy.first
> }
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in 
> assembly).value.copy(includeScala = false)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to