Hello, Consider the following snippet:
> Table sourceTable = getKafkaSource0(tEnv); > DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class) > > * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* > stream.print(); > where sourceTable.printSchema() shows: > root > |-- time1: TimeIndicatorTypeInfo(rowtime) This program returns the following exception: > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) > at app.metatron.test.Main2.main(Main2.java:231) > *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be > cast to java.lang.Long* > * at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) > ... The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. Could anybody help me? Best, - Dongwon p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later. p.s. the source table is created as follows: private static final Table getKafkaSource0(StreamTableEnvironment tEnv) { > ConnectorDescriptor connectorDescriptor = new Kafka() > .version("universal") > .topic("mytopic") > .property("bootstrap.servers", "localhost:9092") > .property("group.id", "mygroup") > .startFromEarliest(); > FormatDescriptor formatDescriptor = new Csv() > .deriveSchema() > .ignoreParseErrors() > .fieldDelimiter(','); > Schema schemaDescriptor = new Schema() > .field("time1", SQL_TIMESTAMP()) > .rowtime( > new Rowtime() > .timestampsFromField("rowTime") > .watermarksPeriodicBounded(100) > ); > tEnv.connect(connectorDescriptor) > .withFormat(formatDescriptor) > .withSchema(schemaDescriptor) > .inAppendMode() > .registerTableSource("mysrc"); > return tEnv.scan("mysrc"); > }