Hi,

Thanks for the reference, Jark. In Pravega connector, user will define Schema 
first and then create the table with the descriptor using the schema, see [1] 
and error also came from this test case. We also tried the recommended 
`bridgedTo(Timestamp.class)` method in the schema construction, it came with 
the same error stack trace.
We are also considering switching to Blink planner implementation, do you think 
we can get this issue fixed with the change?

Here is the full stacktrace:

```
org.apache.flink.table.codegen.CodeGenException: Unsupported cast from 
'LocalDateTime' to 'Long'.

               at 
org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(ScalarOperators.scala:815)
               at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:941)
               at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
               at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
               at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(CodeGenerator.scala:752)
               at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
               at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
               at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
               at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
               at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
               at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
               at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
               at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:742)
               at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
               at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
               at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
               at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1(CodeGenerator.scala:273)
               at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(CodeGenerator.scala:269)
               at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
               at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
               at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
               at 
scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
               at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
               at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
               at 
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
               at 
org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
               at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversionMapper(BatchScan.scala:95)
               at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow(BatchScan.scala:59)
               at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow$(BatchScan.scala:35)
               at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convertToInternalRow(BatchTableSourceScan.scala:45)
               at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:165)
               at 
org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:114)
               at 
org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:92)
               at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
               at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
               at 
org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:87)
               at 
io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceBatchDescriptor(FlinkPravegaTableITCase.java:349)
               at 
io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceUsingDescriptor(FlinkPravegaTableITCase.java:246)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
               at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
               at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
               at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
               at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
               at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
               at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
               at java.lang.Thread.run(Thread.java:748)

Process finished with exit code -1
```

[1] 
https://github.com/pravega/flink-connectors/blob/master/src/test/java/io/pravega/connectors/flink/FlinkPravegaTableITCase.java#L310

Best Regards,
Brian

From: Jark Wu <imj...@gmail.com>
Sent: Thursday, March 19, 2020 20:25
To: Till Rohrmann
Cc: Zhou, Brian; Timo Walther; Jingsong Li; user
Subject: Re: Need help on timestamp type conversion for Table API on Pravega 
Connector


[EXTERNAL EMAIL]
This maybe a similar issue to [1], we continue the discussion there.

Best,
Jark

[1]: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Timetamp-types-incompatible-after-migration-to-1-10-td33784.html#a33791


On Tue, 17 Mar 2020 at 18:05, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>> wrote:
Thanks for reporting this issue Brian. I'm not a Table API expert but I know 
that there is some work on the type system ongoing. I've pulled Timo and 
Jingsong into the conversation who might be able to tell you what exactly 
changed and whether the timestamp issue might be caused by the changes.

Cheers,
Till

On Mon, Mar 16, 2020 at 5:48 AM <b.z...@dell.com<mailto:b.z...@dell.com>> wrote:
Hi community,

Pravega connector is a connector that provides both Batch and Streaming Table 
API implementation. We uses descriptor API to build Table source. When we plan 
to upgrade to Flink 1.10, we found the unit tests are not passing with our 
existing Batch Table API. There is a type conversion error in the Timestamp 
with our descriptor Table API. The detail is in the issue here: 
https://github.com/pravega/flink-connectors/issues/341 Hope someone from Flink 
community can help us with some suggestions on this issue. Thanks.

Best Regards,
Brian

Reply via email to