Hello,

Consider the following snippet:

>     Table sourceTable = getKafkaSource0(tEnv);
>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>
> *      .map(a -> a)      .returns(sourceTable.getSchema().toRowType());*
>     stream.print();
>
where sourceTable.printSchema() shows:

> root
>  |-- time1: TimeIndicatorTypeInfo(rowtime)



 This program returns the following exception:

> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
> at app.metatron.test.Main2.main(Main2.java:231)
> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
> cast to java.lang.Long*
> * at
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> ...


The row serializer seems to try to deep-copy an instance of
java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as
I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>     ConnectorDescriptor connectorDescriptor = new Kafka()
>       .version("universal")
>       .topic("mytopic")
>       .property("bootstrap.servers", "localhost:9092")
>       .property("group.id", "mygroup")
>       .startFromEarliest();
>     FormatDescriptor formatDescriptor = new Csv()
>       .deriveSchema()
>       .ignoreParseErrors()
>       .fieldDelimiter(',');
>     Schema schemaDescriptor = new Schema()
>       .field("time1", SQL_TIMESTAMP())
>       .rowtime(
>         new Rowtime()
>           .timestampsFromField("rowTime")
>           .watermarksPeriodicBounded(100)
>       );
>     tEnv.connect(connectorDescriptor)
>       .withFormat(formatDescriptor)
>       .withSchema(schemaDescriptor)
>       .inAppendMode()
>       .registerTableSource("mysrc");
>     return tEnv.scan("mysrc");
>   }

Reply via email to