[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16192108#comment-16192108
 ] 

ASF GitHub Bot commented on FLINK-7657:
---------------------------------------

Github user kmurra commented on the issue:

    https://github.com/apache/flink/pull/4746
  
    Regarding the time zones, I think I understand your argument here.  Is 
there anything in particular that you would want me to change overall that you 
haven't already outlined to account for that? I do want to document why we're 
doing any conversions of time zones since it took me some amount of time to 
understand why it was being done (it looked incorrect to myself and several 
other developers on first glance).
    
    Also, I noticed that the Calcite fromCalendarFields simply take the fields 
directly from the Calendar, so making time-zone adjustments are unnecessary 
after I made the changes to toRexNode.  I'll fix that as well for my next 
commit.


> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-7657
>                 URL: https://issues.apache.org/jira/browse/FLINK-7657
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.1, 1.3.2
>            Reporter: Kent Murra
>            Assignee: Kent Murra
>            Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>       at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>       at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>       at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>       at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>       at scala.App$class.main(App.scala:76)
>       at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>       at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>       at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>       at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>       at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>       at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
>       ... 19 more
> {code}
> I've done quite a bit of debugging on this and tracked it down to a problem 
> with the way a Calcite AST is translated into an Expression tree for the 
> predicates. Calcite parses timestamps as Calendar values, and you'll note in 
> [RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160]
>  that a Calendar value is being passed as-is to the 
> [Literal|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54]
>  which does no conversion of the value. The Literal, in turn, [expects the 
> value to be a java.sql.Date 
> subclass|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106],
>  which is where the exception arises.
> I've done some informal testing of a bugfix where I convert the calendars to 
> java.sql.Date/java.sql.Time/java.sql.Timestamp in 
> RegNodeToExpressionConverter and had good results. Here is some reproduction 
> code in Scala. I am using Flink version 1.3.2 and running it in local mode 
> (Right-click + Run-as in IntelliJ). 
> {code:none}
> package kmurra
> import java.sql.Date
> import java.util
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
> TypeInformation}
> import org.apache.flink.api.java
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.scala.ExecutionEnvironment
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala.BatchTableEnvironment
> import org.apache.flink.table.expressions.Expression
> import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
> import org.apache.flink.table.sources.{BatchTableSource, 
> FilterableTableSource, TableSource}
> import org.apache.flink.types.Row
> import scala.collection.mutable.ListBuffer
> import scala.collection.JavaConversions._
> object TestReproductionApp extends App {
>   val tables: BatchTableEnvironment = 
> TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
>   val source = new TestTableSource
>   val sink = new PrintTableSink()
>   tables.registerTableSource("test_table", source)
>   tables.sql("SELECT * FROM test_table WHERE last_updated > DATE 
> '2017-05-01'").writeToSink(sink)
> }
> class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
>   def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
>   def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, 
> getFieldNames)
>   protected def copy: TableSinkBase[Row] = new PrintTableSink()
> }
> class TestTableSource(val isFilterPushedDown: Boolean = false) extends 
> BatchTableSource[Row] with FilterableTableSource[Row] {
>   val getReturnType: RowTypeInfo = {
>     val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> SqlTimeTypeInfo.DATE)
>     val fieldNames = Array("data", "last_updated")
>     new RowTypeInfo(typeInfo, fieldNames)
>   }
>   def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = 
> new TestTableSource(true)
>   def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
>     execEnv.fromCollection({
>       val data = ListBuffer[Row]()
>       data += row("Success!", Date.valueOf("2017-09-01"))
>       data += row("Failure!", Date.valueOf("2017-01-01"))
>       data
>     })
>   }
>   def row(data: String, lastUpdated: Date): Row = {
>     val row = new Row(2)
>     row.setField(0, data)
>     row.setField(1, lastUpdated)
>     row
>   }
> }
> {code}
>  Build system is SBT 
> {code:none} 
> name := "kmurra-flink-reproduction"
> organization := "kmurra" 
> version := "1.0"
> scalaVersion := "2.11.8"
> resolvers ++= Seq("Apache Development Snapshot Repository" at 
> "https://repository.apache.org/content/repositories/snapshots/";, 
> Resolver.mavenLocal)
> val flinkVersion = "1.3.2"
> libraryDependencies ++= Seq(
>   "org.apache.flink"     %% "flink-scala"             % flinkVersion,//     % 
> "provided",
>   "org.apache.flink"     %% "flink-table"             % flinkVersion,//     % 
> "provided",
>   "org.apache.flink"     %% "flink-avro"              % flinkVersion,//    % 
> "provided",
>   "org.apache.flink"     %% "flink-streaming-scala"   % flinkVersion,//    % 
> "provided",
>   "org.apache.flink"     %  "flink-jdbc"              % flinkVersion,//     % 
> "provided"
> )
> assemblyMergeStrategy in assembly := {
>   case PathList("META-INF", xs @ _*) => MergeStrategy.discard
>   case x => MergeStrategy.first
> }
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in 
> assembly).value.copy(includeScala = false)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to